From 083d5b2f44c08e7d451cd8d3ab4075f1615d28e6 Mon Sep 17 00:00:00 2001 From: Quentin Fuxa Date: Wed, 7 May 2025 10:55:44 +0200 Subject: [PATCH] uses sentinel object when end of transcription, to properly terminate tasks --- whisperlivekit/audio_processor.py | 205 ++++++++++++++++++++++-------- 1 file changed, 154 insertions(+), 51 deletions(-) diff --git a/whisperlivekit/audio_processor.py b/whisperlivekit/audio_processor.py index 2fc5ce5..903a43f 100644 --- a/whisperlivekit/audio_processor.py +++ b/whisperlivekit/audio_processor.py @@ -15,6 +15,8 @@ logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %( logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) +SENTINEL = object() # unique sentinel object for end of stream marker + def format_time(seconds: float) -> str: """Format seconds as HH:MM:SS.""" return str(timedelta(seconds=int(seconds))) @@ -41,8 +43,9 @@ class AudioProcessor: self.last_ffmpeg_activity = time() self.ffmpeg_health_check_interval = 5 self.ffmpeg_max_idle_time = 10 - + # State management + self.is_stopping = False self.tokens = [] self.buffer_transcription = "" self.buffer_diarization = "" @@ -62,6 +65,13 @@ class AudioProcessor: self.transcription_queue = asyncio.Queue() if self.args.transcription else None self.diarization_queue = asyncio.Queue() if self.args.diarization else None self.pcm_buffer = bytearray() + + # Task references + self.transcription_task = None + self.diarization_task = None + self.ffmpeg_reader_task = None + self.watchdog_task = None + self.all_tasks_for_cleanup = [] # Initialize transcription engine if enabled if self.args.transcription: @@ -210,7 +220,7 @@ class AudioProcessor: self.last_ffmpeg_activity = time() if not chunk: - logger.info("FFmpeg stdout closed.") + logger.info("FFmpeg stdout closed, no more data to read.") break self.pcm_buffer.extend(chunk) @@ -245,6 +255,15 @@ class AudioProcessor: logger.warning(f"Exception in ffmpeg_stdout_reader: {e}") logger.warning(f"Traceback: {traceback.format_exc()}") break + + logger.info("FFmpeg stdout processing finished. Signaling downstream processors.") + if self.args.transcription and self.transcription_queue: + await self.transcription_queue.put(SENTINEL) + logger.debug("Sentinel put into transcription_queue.") + if self.args.diarization and self.diarization_queue: + await self.diarization_queue.put(SENTINEL) + logger.debug("Sentinel put into diarization_queue.") + async def transcription_processor(self): """Process audio chunks for transcription.""" @@ -254,7 +273,16 @@ class AudioProcessor: while True: try: pcm_array = await self.transcription_queue.get() + if pcm_array is SENTINEL: + logger.debug("Transcription processor received sentinel. Finishing.") + self.transcription_queue.task_done() + break + if not self.online: # Should not happen if queue is used + logger.warning("Transcription processor: self.online not initialized.") + self.transcription_queue.task_done() + continue + logger.info(f"{len(self.online.audio_buffer) / self.online.SAMPLING_RATE} seconds of audio to process.") # Process transcription @@ -278,12 +306,15 @@ class AudioProcessor: await self.update_transcription( new_tokens, buffer, end_buffer, self.full_transcription, self.sep ) + self.transcription_queue.task_done() except Exception as e: logger.warning(f"Exception in transcription_processor: {e}") logger.warning(f"Traceback: {traceback.format_exc()}") - finally: - self.transcription_queue.task_done() + if 'pcm_array' in locals() and pcm_array is not SENTINEL : # Check if pcm_array was assigned from queue + self.transcription_queue.task_done() + logger.info("Transcription processor task finished.") + async def diarization_processor(self, diarization_obj): """Process audio chunks for speaker diarization.""" @@ -292,6 +323,10 @@ class AudioProcessor: while True: try: pcm_array = await self.diarization_queue.get() + if pcm_array is SENTINEL: + logger.debug("Diarization processor received sentinel. Finishing.") + self.diarization_queue.task_done() + break # Process diarization await diarization_obj.diarize(pcm_array) @@ -303,12 +338,15 @@ class AudioProcessor: ) await self.update_diarization(new_end, buffer_diarization) + self.diarization_queue.task_done() except Exception as e: logger.warning(f"Exception in diarization_processor: {e}") logger.warning(f"Traceback: {traceback.format_exc()}") - finally: - self.diarization_queue.task_done() + if 'pcm_array' in locals() and pcm_array is not SENTINEL: + self.diarization_queue.task_done() + logger.info("Diarization processor task finished.") + async def results_formatter(self): """Format processing results for output.""" @@ -398,6 +436,19 @@ class AudioProcessor: yield response self.last_response_content = response_content + # Check for termination condition + if self.is_stopping: + all_processors_done = True + if self.args.transcription and self.transcription_task and not self.transcription_task.done(): + all_processors_done = False + if self.args.diarization and self.diarization_task and not self.diarization_task.done(): + all_processors_done = False + + if all_processors_done: + logger.info("Results formatter: All upstream processors are done and in stopping state. Terminating.") + final_state = await self.get_current_state() + return + await asyncio.sleep(0.1) # Avoid overwhelming the client except Exception as e: @@ -407,65 +458,117 @@ class AudioProcessor: async def create_tasks(self): """Create and start processing tasks.""" - - tasks = [] + self.all_tasks_for_cleanup = [] + processing_tasks_for_watchdog = [] + if self.args.transcription and self.online: - tasks.append(asyncio.create_task(self.transcription_processor())) + self.transcription_task = asyncio.create_task(self.transcription_processor()) + self.all_tasks_for_cleanup.append(self.transcription_task) + processing_tasks_for_watchdog.append(self.transcription_task) if self.args.diarization and self.diarization: - tasks.append(asyncio.create_task(self.diarization_processor(self.diarization))) + self.diarization_task = asyncio.create_task(self.diarization_processor(self.diarization)) + self.all_tasks_for_cleanup.append(self.diarization_task) + processing_tasks_for_watchdog.append(self.diarization_task) - tasks.append(asyncio.create_task(self.ffmpeg_stdout_reader())) - - # Monitor overall system health - async def watchdog(): - while True: - try: - await asyncio.sleep(10) # Check every 10 seconds instead of 60 - - current_time = time() - # Check for stalled tasks - for i, task in enumerate(tasks): - if task.done(): - exc = task.exception() if task.done() else None - task_name = task.get_name() if hasattr(task, 'get_name') else f"Task {i}" - logger.error(f"{task_name} unexpectedly completed with exception: {exc}") - - # Check for FFmpeg process health with shorter thresholds - ffmpeg_idle_time = current_time - self.last_ffmpeg_activity - if ffmpeg_idle_time > 15: # 15 seconds instead of 180 - logger.warning(f"FFmpeg idle for {ffmpeg_idle_time:.2f}s - may need attention") - - # Force restart after 30 seconds of inactivity (instead of 600) - if ffmpeg_idle_time > 30: - logger.error("FFmpeg idle for too long, forcing restart") - await self.restart_ffmpeg() - - except Exception as e: - logger.error(f"Error in watchdog task: {e}") + self.ffmpeg_reader_task = asyncio.create_task(self.ffmpeg_stdout_reader()) + self.all_tasks_for_cleanup.append(self.ffmpeg_reader_task) + processing_tasks_for_watchdog.append(self.ffmpeg_reader_task) - tasks.append(asyncio.create_task(watchdog())) - self.tasks = tasks + # Monitor overall system health + self.watchdog_task = asyncio.create_task(self.watchdog(processing_tasks_for_watchdog)) + self.all_tasks_for_cleanup.append(self.watchdog_task) return self.results_formatter() + + async def watchdog(self, tasks_to_monitor): + """Monitors the health of critical processing tasks.""" + while True: + try: + await asyncio.sleep(10) + current_time = time() + + for i, task in enumerate(tasks_to_monitor): + if task.done(): + exc = task.exception() + task_name = task.get_name() if hasattr(task, 'get_name') else f"Monitored Task {i}" + if exc: + logger.error(f"{task_name} unexpectedly completed with exception: {exc}") + else: + logger.info(f"{task_name} completed normally.") + + ffmpeg_idle_time = current_time - self.last_ffmpeg_activity + if ffmpeg_idle_time > 15: + logger.warning(f"FFmpeg idle for {ffmpeg_idle_time:.2f}s - may need attention.") + if ffmpeg_idle_time > 30 and not self.is_stopping: + logger.error("FFmpeg idle for too long and not in stopping phase, forcing restart.") + await self.restart_ffmpeg() + except asyncio.CancelledError: + logger.info("Watchdog task cancelled.") + break + except Exception as e: + logger.error(f"Error in watchdog task: {e}", exc_info=True) async def cleanup(self): """Clean up resources when processing is complete.""" - for task in self.tasks: - task.cancel() + logger.info("Starting cleanup of AudioProcessor resources.") + for task in self.all_tasks_for_cleanup: + if task and not task.done(): + task.cancel() + + created_tasks = [t for t in self.all_tasks_for_cleanup if t] + if created_tasks: + await asyncio.gather(*created_tasks, return_exceptions=True) + logger.info("All processing tasks cancelled or finished.") + + if self.ffmpeg_process: + if self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed: + try: + self.ffmpeg_process.stdin.close() + except Exception as e: + logger.warning(f"Error closing ffmpeg stdin during cleanup: {e}") - try: - await asyncio.gather(*self.tasks, return_exceptions=True) - self.ffmpeg_process.stdin.close() - self.ffmpeg_process.wait() - except Exception as e: - logger.warning(f"Error during cleanup: {e}") - - if self.args.diarization and hasattr(self, 'diarization'): + # Wait for ffmpeg process to terminate + if self.ffmpeg_process.poll() is None: # Check if process is still running + logger.info("Waiting for FFmpeg process to terminate...") + try: + # Run wait in executor to avoid blocking async loop + await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait, 5.0) # 5s timeout + except Exception as e: # subprocess.TimeoutExpired is not directly caught by asyncio.wait_for with run_in_executor + logger.warning(f"FFmpeg did not terminate gracefully, killing. Error: {e}") + self.ffmpeg_process.kill() + await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait) # Wait for kill + logger.info("FFmpeg process terminated.") + + if self.args.diarization and hasattr(self, 'diarization') and hasattr(self.diarization, 'close'): self.diarization.close() + logger.info("AudioProcessor cleanup complete.") + async def process_audio(self, message): """Process incoming audio data.""" + # If already stopping or stdin is closed, ignore further audio, especially residual chunks. + if self.is_stopping or (self.ffmpeg_process and self.ffmpeg_process.stdin and self.ffmpeg_process.stdin.closed): + logger.warning(f"AudioProcessor is stopping or stdin is closed. Ignoring incoming audio message (length: {len(message)}).") + if not message and self.ffmpeg_process and self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed: + logger.info("Received empty message while already in stopping state; ensuring stdin is closed.") + try: + self.ffmpeg_process.stdin.close() + except Exception as e: + logger.warning(f"Error closing ffmpeg stdin on redundant stop signal during stopping state: {e}") + return + + if not message: # primary signal to start stopping + logger.info("Empty audio message received, initiating stop sequence.") + self.is_stopping = True + if self.ffmpeg_process and self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed: + try: + self.ffmpeg_process.stdin.close() + logger.info("FFmpeg stdin closed due to primary stop signal.") + except Exception as e: + logger.warning(f"Error closing ffmpeg stdin on stop: {e}") + return + retry_count = 0 max_retries = 3 @@ -517,4 +620,4 @@ class AudioProcessor: else: logger.error("Maximum retries reached for FFmpeg process") await self.restart_ffmpeg() - return \ No newline at end of file + return