_processing_tasks_done checks task completion

This commit is contained in:
Quentin Fuxa
2025-11-05 23:34:00 +01:00
parent 41ca17acda
commit ffe5284764
3 changed files with 22 additions and 5 deletions

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "whisperlivekit"
version = "0.2.13"
version = "0.2.13.post1"
description = "Real-time speech-to-text with speaker diarization using Whisper"
readme = "README.md"
authors = [
@@ -30,7 +30,6 @@ dependencies = [
"fastapi",
"librosa",
"soundfile",
"faster-whisper",
"uvicorn",
"websockets",
"torchaudio>=2.0.0",

View File

@@ -464,7 +464,7 @@ class AudioProcessor:
yield response
self.last_response_content = response
if self.is_stopping and self.transcription_task and self.transcription_task.done() and self.diarization_task and self.diarization_task.done():
if self.is_stopping and self._processing_tasks_done():
logger.info("Results formatter: All upstream processors are done and in stopping state. Terminating.")
return
@@ -517,11 +517,16 @@ class AudioProcessor:
async def watchdog(self, tasks_to_monitor):
"""Monitors the health of critical processing tasks."""
tasks_remaining = [task for task in tasks_to_monitor if task]
while True:
try:
if not tasks_remaining:
logger.info("Watchdog task finishing: all monitored tasks completed.")
return
await asyncio.sleep(10)
for i, task in enumerate(tasks_to_monitor):
for i, task in enumerate(list(tasks_remaining)):
if task.done():
exc = task.exception()
task_name = task.get_name() if hasattr(task, 'get_name') else f"Monitored Task {i}"
@@ -529,6 +534,7 @@ class AudioProcessor:
logger.error(f"{task_name} unexpectedly completed with exception: {exc}")
else:
logger.info(f"{task_name} completed normally.")
tasks_remaining.remove(task)
except asyncio.CancelledError:
logger.info("Watchdog task cancelled.")
@@ -559,6 +565,16 @@ class AudioProcessor:
self.diarization.close()
logger.info("AudioProcessor cleanup complete.")
def _processing_tasks_done(self):
"""Return True when all active processing tasks have completed."""
tasks_to_check = [
self.transcription_task,
self.diarization_task,
self.translation_task,
self.ffmpeg_reader_task,
]
return all(task.done() for task in tasks_to_check if task)
async def process_audio(self, message):
"""Process incoming audio data."""

View File

@@ -23,7 +23,7 @@ try:
HAS_MLX_WHISPER = True
except ImportError:
if platform.system() == "Darwin" and platform.machine() == "arm64":
print(f"""{"="*50}\nMLX Whisper not found but you are on Apple Silicon. Consider installing mlx-whisper for better performance: pip install mlx-whisper\n{"="*50}""")
print(f"""{"="*50}\nMLX Whisper not found but you are on Apple Silicon. Consider installing mlx-whisper for better performance: pip install `mlx-whisper\n{"="*50}`""")
HAS_MLX_WHISPER = False
if HAS_MLX_WHISPER:
HAS_FASTER_WHISPER = False
@@ -32,6 +32,8 @@ else:
from faster_whisper import WhisperModel
HAS_FASTER_WHISPER = True
except ImportError:
if platform.system() != "Darwin":
print(f"""{"="*50}\nFaster-Whisper not found but. Consider installing faster-whisper for better performance: pip install `faster-whisper\n{"="*50}`""")
HAS_FASTER_WHISPER = False
def model_path_and_type(model_path):