ffmpeg-python is not used anymore - ffmpeg is directly called through create_subprocess_exec

This commit is contained in:
Quentin Fuxa
2025-07-01 18:53:35 +02:00
parent 8a175e79d8
commit f25de6d8a4
2 changed files with 141 additions and 439 deletions

View File

@@ -167,6 +167,9 @@ class AudioProcessor:
if state == FFmpegState.FAILED:
logger.error("FFmpeg is in FAILED state, cannot read data")
break
elif state == FFmpegState.STOPPED:
logger.info("FFmpeg is stopped")
break
elif state != FFmpegState.RUNNING:
logger.warning(f"FFmpeg is in {state} state, waiting...")
await asyncio.sleep(0.5)

View File

@@ -1,13 +1,31 @@
import asyncio
import ffmpeg
import logging
import time
import traceback
from typing import Optional, Callable
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
from typing import Optional, Callable
import contextlib
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
ERROR_INSTALL_INSTRUCTIONS = """
FFmpeg is not installed or not found in your system's PATH.
Please install FFmpeg to enable audio processing.
Installation instructions:
# Ubuntu/Debian:
sudo apt update && sudo apt install ffmpeg
# macOS (using Homebrew):
brew install ffmpeg
# Windows:
# 1. Download the latest static build from https://ffmpeg.org/download.html
# 2. Extract the archive (e.g., to C:\\FFmpeg).
# 3. Add the 'bin' directory (e.g., C:\\FFmpeg\\bin) to your system's PATH environment variable.
After installation, please restart the application.
"""
class FFmpegState(Enum):
STOPPED = "stopped"
@@ -16,479 +34,160 @@ class FFmpegState(Enum):
RESTARTING = "restarting"
FAILED = "failed"
class CircuitBreaker:
"""to prevent endless restart loops."""
def __init__(self, failure_threshold: int = 5, recovery_timeout: float = 60.0):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = 0
self.is_open = False
def record_success(self):
self.failure_count = 0
self.is_open = False
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.is_open = True
logger.error(f"Circuit breaker opened after {self.failure_count} failures")
def can_attempt(self) -> bool:
if not self.is_open:
return True
if time.time() - self.last_failure_time > self.recovery_timeout:
logger.info("Circuit breaker recovery timeout reached, attempting reset")
self.is_open = False
self.failure_count = 0
return True
return False
class FFmpegManager:
def __init__(self, sample_rate: int = 16000, channels: int = 1,
max_retries: int = 3, restart_delay: float = 1.0):
def __init__(self, sample_rate: int = 16000, channels: int = 1):
self.sample_rate = sample_rate
self.channels = channels
self.max_retries = max_retries
self.restart_delay = restart_delay
self.process: Optional[object] = None
self.process: Optional[asyncio.subprocess.Process] = None
self._stderr_task: Optional[asyncio.Task] = None
self.on_error_callback: Optional[Callable[[str], None]] = None
self.state = FFmpegState.STOPPED
self.state_lock = asyncio.Lock()
self.circuit_breaker = CircuitBreaker()
self.executor = ThreadPoolExecutor(max_workers=2, thread_name_prefix="ffmpeg")
self._write_queue = asyncio.Queue(maxsize=100)
self._write_task: Optional[asyncio.Task] = None
self._monitor_task: Optional[asyncio.Task] = None
self._stderr_task : Optional[asyncio.Task] = None
self.last_activity = time.time()
self.on_data_callback: Optional[Callable] = None
self.on_error_callback: Optional[Callable] = None
self._restart_lock = asyncio.Lock()
self._restart_in_progress = False
async def start(self):
async with self.state_lock:
self._state_lock = asyncio.Lock()
async def start(self) -> bool:
async with self._state_lock:
if self.state != FFmpegState.STOPPED:
logger.warning(f"Cannot start FFmpeg, current state: {self.state}")
logger.warning(f"FFmpeg already running in state: {self.state}")
return False
self.state = FFmpegState.STARTING
try:
if not self.circuit_breaker.can_attempt():
logger.error("CB is open, cannot start FFmpeg")
async with self.state_lock:
self.state = FFmpegState.FAILED
return False
# Start FFmpeg process
success = await self._start_process()
if not success:
self.circuit_breaker.record_failure()
async with self.state_lock:
self.state = FFmpegState.FAILED
return False
self.circuit_breaker.record_success()
# Start background tasks
self._write_task = asyncio.create_task(self._write_worker())
self._monitor_task = asyncio.create_task(self._monitor_health())
self._stderr_task = asyncio.create_task(self._read_stderr())
async with self.state_lock:
cmd = [
"ffmpeg",
"-hide_banner",
"-loglevel", "error",
"-i", "pipe:0",
"-f", "s16le",
"-acodec", "pcm_s16le",
"-ac", str(self.channels),
"-ar", str(self.sample_rate),
"pipe:1"
]
self.process = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
self._stderr_task = asyncio.create_task(self._drain_stderr())
async with self._state_lock:
self.state = FFmpegState.RUNNING
logger.info("FFmpeg manager started ok")
logger.info("FFmpeg started.")
return True
except Exception as e:
logger.error(f"Failed to start FFmpeg manager: {e}")
logger.error(traceback.format_exc())
self.circuit_breaker.record_failure()
async with self.state_lock:
except FileNotFoundError:
logger.error(ERROR_INSTALL_INSTRUCTIONS)
async with self._state_lock:
self.state = FFmpegState.FAILED
if self.on_error_callback:
await self.on_error_callback("ffmpeg_not_found")
return False
except Exception as e:
logger.error(f"Error starting FFmpeg: {e}")
async with self._state_lock:
self.state = FFmpegState.FAILED
if self.on_error_callback:
await self.on_error_callback("start_failed")
return False
async def stop(self):
logger.info("Stopping FFmpeg manager")
async with self.state_lock:
async with self._state_lock:
if self.state == FFmpegState.STOPPED:
return
self.state = FFmpegState.STOPPED
# Cancel background tasks
if self._write_task and not self._write_task.done():
self._write_task.cancel()
try:
await self._write_task
except asyncio.CancelledError:
pass
if self._monitor_task and not self._monitor_task.done():
self._monitor_task.cancel()
try:
await self._monitor_task
except asyncio.CancelledError:
pass
await self._stop_process()
while not self._write_queue.empty():
try:
self._write_queue.get_nowait()
except asyncio.QueueEmpty:
break
logger.info("FFmpeg manager stopped")
if self.process:
if self.process.stdin and not self.process.stdin.is_closing():
self.process.stdin.close()
await self.process.stdin.wait_closed()
await self.process.wait()
self.process = None
if self._stderr_task:
self._stderr_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._stderr_task
logger.info("FFmpeg stopped.")
async def write_data(self, data: bytes) -> bool:
current_state = await self.get_state()
if current_state != FFmpegState.RUNNING:
logger.warning(f"Cannot write data, FFmpeg state: {current_state}")
return False
async with self._state_lock:
if self.state != FFmpegState.RUNNING:
logger.warning(f"Cannot write, FFmpeg state: {self.state}")
return False
try:
# Use nowait to avoid blocking
self._write_queue.put_nowait(data)
self.process.stdin.write(data)
await self.process.stdin.drain()
return True
except asyncio.QueueFull:
logger.warning("FFmpeg write queue is full, dropping data")
except Exception as e:
logger.error(f"Error writing to FFmpeg: {e}")
if self.on_error_callback:
await self.on_error_callback("write_queue_full")
await self.on_error_callback("write_error")
return False
async def read_data(self, size: int) -> Optional[bytes]:
"""Read data from FFmpeg stdout (non-blocking)."""
if not self.process or self.process.poll() is not None:
return None
async with self._state_lock:
if self.state != FFmpegState.RUNNING:
logger.warning(f"Cannot read, FFmpeg state: {self.state}")
return None
try:
loop = asyncio.get_event_loop()
data = await asyncio.wait_for(
loop.run_in_executor(self.executor, self.process.stdout.read, size),
self.process.stdout.read(size),
timeout=5.0
)
if data:
self.last_activity = time.time()
return data
except asyncio.TimeoutError:
logger.warning("FFmpeg read timeout")
logger.warning("FFmpeg read timeout.")
return None
except Exception as e:
logger.error(f"Error reading from FFmpeg: {e}")
if self.on_error_callback:
await self.on_error_callback("read_error")
return None
async def get_state(self) -> FFmpegState:
"""Get the current FFmpeg state."""
async with self.state_lock:
async with self._state_lock:
return self.state
async def restart(self, is_external_kill: bool = False) -> bool:
"""Restart the FFmpeg process."""
async with self._restart_lock:
if self._restart_in_progress:
logger.info("Restart already in progress, skipping")
async def restart(self) -> bool:
async with self._state_lock:
if self.state == FFmpegState.RESTARTING:
logger.warning("Restart already in progress.")
return False
self._restart_in_progress = True
self.state = FFmpegState.RESTARTING
logger.info("Restarting FFmpeg...")
try:
logger.warning("Restarting FFmpeg process")
async with self.state_lock:
if self.state == FFmpegState.RESTARTING:
logger.warning("FFmpeg is already restarting")
return False
prev_state = self.state
self.state = FFmpegState.RESTARTING
if is_external_kill:
logger.warning("External kill detected, resetting circuit breaker. (Check the exit code) ")
self.circuit_breaker.failure_count = 0
self.circuit_breaker.is_open = False
logger.warning("Clearing write queue")
while not self._write_queue.empty():
try:
self._write_queue.get_nowait()
except asyncio.QueueEmpty:
break
await self._stop_process()
await asyncio.sleep(self.restart_delay)
for attempt in range(self.max_retries):
if not self.circuit_breaker.can_attempt():
logger.error("Circuit breaker is open, cannot restart")
async with self.state_lock:
self.state = FFmpegState.FAILED
return False
success = await self._start_process()
if success:
self.circuit_breaker.record_success()
async with self.state_lock:
self.state = FFmpegState.RUNNING
logger.info("FFmpeg restarted successfully")
return True
self.circuit_breaker.record_failure()
if attempt < self.max_retries - 1:
delay = self.restart_delay * (2 ** attempt) # test
logger.warning(f"Restart attempt {attempt + 1} failed, waiting {delay}s")
await asyncio.sleep(delay)
logger.error("Failed to restart FFmpeg after all attempts")
async with self.state_lock:
await self.stop()
await asyncio.sleep(1) # short delay before restarting
return await self.start()
except Exception as e:
logger.error(f"Error during FFmpeg restart: {e}")
async with self._state_lock:
self.state = FFmpegState.FAILED
if self.on_error_callback:
await self.on_error_callback("restart_failed")
return False
finally:
async with self._restart_lock:
self._restart_in_progress = False
async def _start_process(self) -> bool:
"""Start the FFmpeg process."""
async def _drain_stderr(self):
try:
self.process = (
ffmpeg.input("pipe:0", format="webm")
.output("pipe:1", format="s16le", acodec="pcm_s16le",
ac=self.channels, ar=str(self.sample_rate))
.run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True)
)
await asyncio.sleep(0.1)
if self.process.poll() is not None:
logger.error("FFmpeg process died immediately after starting")
return False
self.last_activity = time.time()
logger.info("FFmpeg process started successfully")
return True
except FileNotFoundError:
error = """
FFmpeg is not installed or not found in your system's PATH.
Please install FFmpeg to enable audio processing.
Installation instructions:
# Ubuntu/Debian:
sudo apt update && sudo apt install ffmpeg
# macOS (using Homebrew):
brew install ffmpeg
# Windows:
# 1. Download the latest static build from https://ffmpeg.org/download.html
# 2. Extract the archive (e.g., to C:\\FFmpeg).
# 3. Add the 'bin' directory (e.g., C:\\FFmpeg\\bin) to your system's PATH environment variable.
After installation, please restart the application.
"""
logger.error(error)
return False
while True:
line = await self.process.stderr.readline()
if not line:
break
logger.debug(f"FFmpeg stderr: {line.decode(errors='ignore').strip()}")
except asyncio.CancelledError:
logger.info("FFmpeg stderr drain task cancelled.")
except Exception as e:
logger.error(f"Failed to start FFmpeg process: {e}")
logger.error(traceback.format_exc())
return False
async def _stop_process(self):
"""Stop the FFmpeg process gracefully."""
if not self.process:
return
try:
# Close stdin
if self.process.stdin and not self.process.stdin.closed:
loop = asyncio.get_event_loop()
await loop.run_in_executor(self.executor, self.process.stdin.close)
# Wait for process to terminate
if self.process.poll() is None:
try:
await asyncio.wait_for(
asyncio.get_event_loop().run_in_executor(
self.executor, self.process.wait
),
timeout=5.0
)
except asyncio.TimeoutError:
logger.warning("FFmpeg did not terminate gracefully, killing")
self.process.kill()
await asyncio.get_event_loop().run_in_executor(
self.executor, self.process.wait
)
logger.info("FFmpeg process stopped")
except Exception as e:
logger.error(f"Error stopping FFmpeg process: {e}")
logger.error(traceback.format_exc())
finally:
self.process = None
async def _write_worker(self):
"""Background worker to write data to FFmpeg."""
logger.info("FFmpeg write worker started")
consecutive_failures = 0
while True:
try:
# Get data from queue
data = await self._write_queue.get()
# Check if we're in a restart state
current_state = await self.get_state()
if current_state == FFmpegState.RESTARTING:
# Drop data during restart
continue
if not self.process or self.process.poll() is not None:
consecutive_failures += 1
# If we have multiple consecutive failures, it might be an external kill
if consecutive_failures > 3:
logger.warning("Multiple write failures detected, possible external kill")
if self.on_error_callback:
await self.on_error_callback("process_not_available")
# Let the health monitor handle the restart
await asyncio.sleep(0.5)
continue
# Write data in executor to avoid blocking
loop = asyncio.get_event_loop()
try:
await asyncio.wait_for(
loop.run_in_executor(
self.executor,
self._write_to_process,
data
),
timeout=5.0
)
self.last_activity = time.time()
consecutive_failures = 0 # Reset on success
except asyncio.TimeoutError:
logger.error("FFmpeg write timeout")
if self.on_error_callback:
await self.on_error_callback("write_timeout")
# Trigger restart only if not already restarting
if await self.get_state() != FFmpegState.RESTARTING:
asyncio.create_task(self.restart())
except Exception as e:
logger.error(f"Error writing to FFmpeg: {e}")
if self.on_error_callback:
await self.on_error_callback("write_error")
# Trigger restart only if not already restarting
if await self.get_state() != FFmpegState.RESTARTING:
asyncio.create_task(self.restart())
except asyncio.CancelledError:
logger.info("FFmpeg write worker cancelled")
break
except Exception as e:
logger.error(f"Unexpected error in write worker: {e}")
logger.error(traceback.format_exc())
await asyncio.sleep(1)
def _write_to_process(self, data: bytes):
if self.process and self.process.stdin and not self.process.stdin.closed:
self.process.stdin.write(data)
self.process.stdin.flush()
async def _monitor_health(self):
logger.info("FFmpeg health monitor started")
last_known_pid = None
while True:
try:
await asyncio.sleep(5)
current_state = await self.get_state()
if current_state not in [FFmpegState.RUNNING, FFmpegState.RESTARTING]:
continue
if not self.process or self.process.poll() is not None:
if current_state != FFmpegState.RESTARTING:
logger.error(f"FFmpeg process died unexpectedly with exit code: {self.process.poll()}")
is_external_kill = last_known_pid is not None
if is_external_kill:
logger.info("Detected possible external kill (e.g., pkill)")
if self.on_error_callback:
await self.on_error_callback("process_died")
await self.restart(is_external_kill=is_external_kill)
continue
if self.process and hasattr(self.process, 'pid'):
last_known_pid = self.process.pid
idle_time = time.time() - self.last_activity
if idle_time > 30:
logger.warning(f"FFmpeg idle for {idle_time:.1f}s")
if idle_time > 60 and current_state != FFmpegState.RESTARTING:
logger.error("FFmpeg idle timeout, restarting")
if self.on_error_callback:
await self.on_error_callback("idle_timeout")
await self.restart()
except asyncio.CancelledError:
logger.info("FFmpeg health monitor cancelled")
break
except Exception as e:
logger.error(f"Error in health monitor: {e}")
logger.error(traceback.format_exc())
await asyncio.sleep(5)
async def _read_stderr(self):
"""Reads from FFmpeg's stderr to prevent buffer overflow."""
logger.info("FFmpeg stderr reader started.")
while not self._stop_requested.is_set():
if not self.process or not self.process.stderr:
await asyncio.sleep(0.5)
continue
try:
loop = asyncio.get_event_loop()
line = await asyncio.wait_for(
loop.run_in_executor(self.executor, self.process.stderr.readline),
timeout=1.0
)
if line:
logger.debug(f"FFmpeg stderr: {line.decode(errors='ignore').strip()}")
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
logger.info("FFmpeg stderr reader cancelled.")
break
except Exception as e:
if not self._stop_requested.is_set():
logger.error(f"Error reading FFmpeg stderr: {e}")
await asyncio.sleep(1)
def __del__(self):
"""Cleanup resources."""
self.executor.shutdown(wait=False)
logger.error(f"Error draining FFmpeg stderr: {e}")