mirror of
https://github.com/QuentinFuxa/WhisperLiveKit.git
synced 2026-03-07 22:33:36 +00:00
uses sentinel object when end of transcription, to properly terminate tasks
This commit is contained in:
@@ -15,6 +15,8 @@ logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
SENTINEL = object() # unique sentinel object for end of stream marker
|
||||
|
||||
def format_time(seconds: float) -> str:
|
||||
"""Format seconds as HH:MM:SS."""
|
||||
return str(timedelta(seconds=int(seconds)))
|
||||
@@ -41,8 +43,9 @@ class AudioProcessor:
|
||||
self.last_ffmpeg_activity = time()
|
||||
self.ffmpeg_health_check_interval = 5
|
||||
self.ffmpeg_max_idle_time = 10
|
||||
|
||||
|
||||
# State management
|
||||
self.is_stopping = False
|
||||
self.tokens = []
|
||||
self.buffer_transcription = ""
|
||||
self.buffer_diarization = ""
|
||||
@@ -62,6 +65,13 @@ class AudioProcessor:
|
||||
self.transcription_queue = asyncio.Queue() if self.args.transcription else None
|
||||
self.diarization_queue = asyncio.Queue() if self.args.diarization else None
|
||||
self.pcm_buffer = bytearray()
|
||||
|
||||
# Task references
|
||||
self.transcription_task = None
|
||||
self.diarization_task = None
|
||||
self.ffmpeg_reader_task = None
|
||||
self.watchdog_task = None
|
||||
self.all_tasks_for_cleanup = []
|
||||
|
||||
# Initialize transcription engine if enabled
|
||||
if self.args.transcription:
|
||||
@@ -210,7 +220,7 @@ class AudioProcessor:
|
||||
self.last_ffmpeg_activity = time()
|
||||
|
||||
if not chunk:
|
||||
logger.info("FFmpeg stdout closed.")
|
||||
logger.info("FFmpeg stdout closed, no more data to read.")
|
||||
break
|
||||
|
||||
self.pcm_buffer.extend(chunk)
|
||||
@@ -245,6 +255,15 @@ class AudioProcessor:
|
||||
logger.warning(f"Exception in ffmpeg_stdout_reader: {e}")
|
||||
logger.warning(f"Traceback: {traceback.format_exc()}")
|
||||
break
|
||||
|
||||
logger.info("FFmpeg stdout processing finished. Signaling downstream processors.")
|
||||
if self.args.transcription and self.transcription_queue:
|
||||
await self.transcription_queue.put(SENTINEL)
|
||||
logger.debug("Sentinel put into transcription_queue.")
|
||||
if self.args.diarization and self.diarization_queue:
|
||||
await self.diarization_queue.put(SENTINEL)
|
||||
logger.debug("Sentinel put into diarization_queue.")
|
||||
|
||||
|
||||
async def transcription_processor(self):
|
||||
"""Process audio chunks for transcription."""
|
||||
@@ -254,7 +273,16 @@ class AudioProcessor:
|
||||
while True:
|
||||
try:
|
||||
pcm_array = await self.transcription_queue.get()
|
||||
if pcm_array is SENTINEL:
|
||||
logger.debug("Transcription processor received sentinel. Finishing.")
|
||||
self.transcription_queue.task_done()
|
||||
break
|
||||
|
||||
if not self.online: # Should not happen if queue is used
|
||||
logger.warning("Transcription processor: self.online not initialized.")
|
||||
self.transcription_queue.task_done()
|
||||
continue
|
||||
|
||||
logger.info(f"{len(self.online.audio_buffer) / self.online.SAMPLING_RATE} seconds of audio to process.")
|
||||
|
||||
# Process transcription
|
||||
@@ -278,12 +306,15 @@ class AudioProcessor:
|
||||
await self.update_transcription(
|
||||
new_tokens, buffer, end_buffer, self.full_transcription, self.sep
|
||||
)
|
||||
self.transcription_queue.task_done()
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Exception in transcription_processor: {e}")
|
||||
logger.warning(f"Traceback: {traceback.format_exc()}")
|
||||
finally:
|
||||
self.transcription_queue.task_done()
|
||||
if 'pcm_array' in locals() and pcm_array is not SENTINEL : # Check if pcm_array was assigned from queue
|
||||
self.transcription_queue.task_done()
|
||||
logger.info("Transcription processor task finished.")
|
||||
|
||||
|
||||
async def diarization_processor(self, diarization_obj):
|
||||
"""Process audio chunks for speaker diarization."""
|
||||
@@ -292,6 +323,10 @@ class AudioProcessor:
|
||||
while True:
|
||||
try:
|
||||
pcm_array = await self.diarization_queue.get()
|
||||
if pcm_array is SENTINEL:
|
||||
logger.debug("Diarization processor received sentinel. Finishing.")
|
||||
self.diarization_queue.task_done()
|
||||
break
|
||||
|
||||
# Process diarization
|
||||
await diarization_obj.diarize(pcm_array)
|
||||
@@ -303,12 +338,15 @@ class AudioProcessor:
|
||||
)
|
||||
|
||||
await self.update_diarization(new_end, buffer_diarization)
|
||||
self.diarization_queue.task_done()
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Exception in diarization_processor: {e}")
|
||||
logger.warning(f"Traceback: {traceback.format_exc()}")
|
||||
finally:
|
||||
self.diarization_queue.task_done()
|
||||
if 'pcm_array' in locals() and pcm_array is not SENTINEL:
|
||||
self.diarization_queue.task_done()
|
||||
logger.info("Diarization processor task finished.")
|
||||
|
||||
|
||||
async def results_formatter(self):
|
||||
"""Format processing results for output."""
|
||||
@@ -398,6 +436,19 @@ class AudioProcessor:
|
||||
yield response
|
||||
self.last_response_content = response_content
|
||||
|
||||
# Check for termination condition
|
||||
if self.is_stopping:
|
||||
all_processors_done = True
|
||||
if self.args.transcription and self.transcription_task and not self.transcription_task.done():
|
||||
all_processors_done = False
|
||||
if self.args.diarization and self.diarization_task and not self.diarization_task.done():
|
||||
all_processors_done = False
|
||||
|
||||
if all_processors_done:
|
||||
logger.info("Results formatter: All upstream processors are done and in stopping state. Terminating.")
|
||||
final_state = await self.get_current_state()
|
||||
return
|
||||
|
||||
await asyncio.sleep(0.1) # Avoid overwhelming the client
|
||||
|
||||
except Exception as e:
|
||||
@@ -407,65 +458,117 @@ class AudioProcessor:
|
||||
|
||||
async def create_tasks(self):
|
||||
"""Create and start processing tasks."""
|
||||
|
||||
tasks = []
|
||||
self.all_tasks_for_cleanup = []
|
||||
processing_tasks_for_watchdog = []
|
||||
|
||||
if self.args.transcription and self.online:
|
||||
tasks.append(asyncio.create_task(self.transcription_processor()))
|
||||
self.transcription_task = asyncio.create_task(self.transcription_processor())
|
||||
self.all_tasks_for_cleanup.append(self.transcription_task)
|
||||
processing_tasks_for_watchdog.append(self.transcription_task)
|
||||
|
||||
if self.args.diarization and self.diarization:
|
||||
tasks.append(asyncio.create_task(self.diarization_processor(self.diarization)))
|
||||
self.diarization_task = asyncio.create_task(self.diarization_processor(self.diarization))
|
||||
self.all_tasks_for_cleanup.append(self.diarization_task)
|
||||
processing_tasks_for_watchdog.append(self.diarization_task)
|
||||
|
||||
tasks.append(asyncio.create_task(self.ffmpeg_stdout_reader()))
|
||||
|
||||
# Monitor overall system health
|
||||
async def watchdog():
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(10) # Check every 10 seconds instead of 60
|
||||
|
||||
current_time = time()
|
||||
# Check for stalled tasks
|
||||
for i, task in enumerate(tasks):
|
||||
if task.done():
|
||||
exc = task.exception() if task.done() else None
|
||||
task_name = task.get_name() if hasattr(task, 'get_name') else f"Task {i}"
|
||||
logger.error(f"{task_name} unexpectedly completed with exception: {exc}")
|
||||
|
||||
# Check for FFmpeg process health with shorter thresholds
|
||||
ffmpeg_idle_time = current_time - self.last_ffmpeg_activity
|
||||
if ffmpeg_idle_time > 15: # 15 seconds instead of 180
|
||||
logger.warning(f"FFmpeg idle for {ffmpeg_idle_time:.2f}s - may need attention")
|
||||
|
||||
# Force restart after 30 seconds of inactivity (instead of 600)
|
||||
if ffmpeg_idle_time > 30:
|
||||
logger.error("FFmpeg idle for too long, forcing restart")
|
||||
await self.restart_ffmpeg()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in watchdog task: {e}")
|
||||
self.ffmpeg_reader_task = asyncio.create_task(self.ffmpeg_stdout_reader())
|
||||
self.all_tasks_for_cleanup.append(self.ffmpeg_reader_task)
|
||||
processing_tasks_for_watchdog.append(self.ffmpeg_reader_task)
|
||||
|
||||
tasks.append(asyncio.create_task(watchdog()))
|
||||
self.tasks = tasks
|
||||
# Monitor overall system health
|
||||
self.watchdog_task = asyncio.create_task(self.watchdog(processing_tasks_for_watchdog))
|
||||
self.all_tasks_for_cleanup.append(self.watchdog_task)
|
||||
|
||||
return self.results_formatter()
|
||||
|
||||
async def watchdog(self, tasks_to_monitor):
|
||||
"""Monitors the health of critical processing tasks."""
|
||||
while True:
|
||||
try:
|
||||
await asyncio.sleep(10)
|
||||
current_time = time()
|
||||
|
||||
for i, task in enumerate(tasks_to_monitor):
|
||||
if task.done():
|
||||
exc = task.exception()
|
||||
task_name = task.get_name() if hasattr(task, 'get_name') else f"Monitored Task {i}"
|
||||
if exc:
|
||||
logger.error(f"{task_name} unexpectedly completed with exception: {exc}")
|
||||
else:
|
||||
logger.info(f"{task_name} completed normally.")
|
||||
|
||||
ffmpeg_idle_time = current_time - self.last_ffmpeg_activity
|
||||
if ffmpeg_idle_time > 15:
|
||||
logger.warning(f"FFmpeg idle for {ffmpeg_idle_time:.2f}s - may need attention.")
|
||||
if ffmpeg_idle_time > 30 and not self.is_stopping:
|
||||
logger.error("FFmpeg idle for too long and not in stopping phase, forcing restart.")
|
||||
await self.restart_ffmpeg()
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Watchdog task cancelled.")
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Error in watchdog task: {e}", exc_info=True)
|
||||
|
||||
async def cleanup(self):
|
||||
"""Clean up resources when processing is complete."""
|
||||
for task in self.tasks:
|
||||
task.cancel()
|
||||
logger.info("Starting cleanup of AudioProcessor resources.")
|
||||
for task in self.all_tasks_for_cleanup:
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
|
||||
created_tasks = [t for t in self.all_tasks_for_cleanup if t]
|
||||
if created_tasks:
|
||||
await asyncio.gather(*created_tasks, return_exceptions=True)
|
||||
logger.info("All processing tasks cancelled or finished.")
|
||||
|
||||
if self.ffmpeg_process:
|
||||
if self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
|
||||
try:
|
||||
self.ffmpeg_process.stdin.close()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error closing ffmpeg stdin during cleanup: {e}")
|
||||
|
||||
try:
|
||||
await asyncio.gather(*self.tasks, return_exceptions=True)
|
||||
self.ffmpeg_process.stdin.close()
|
||||
self.ffmpeg_process.wait()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error during cleanup: {e}")
|
||||
|
||||
if self.args.diarization and hasattr(self, 'diarization'):
|
||||
# Wait for ffmpeg process to terminate
|
||||
if self.ffmpeg_process.poll() is None: # Check if process is still running
|
||||
logger.info("Waiting for FFmpeg process to terminate...")
|
||||
try:
|
||||
# Run wait in executor to avoid blocking async loop
|
||||
await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait, 5.0) # 5s timeout
|
||||
except Exception as e: # subprocess.TimeoutExpired is not directly caught by asyncio.wait_for with run_in_executor
|
||||
logger.warning(f"FFmpeg did not terminate gracefully, killing. Error: {e}")
|
||||
self.ffmpeg_process.kill()
|
||||
await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait) # Wait for kill
|
||||
logger.info("FFmpeg process terminated.")
|
||||
|
||||
if self.args.diarization and hasattr(self, 'diarization') and hasattr(self.diarization, 'close'):
|
||||
self.diarization.close()
|
||||
logger.info("AudioProcessor cleanup complete.")
|
||||
|
||||
|
||||
async def process_audio(self, message):
|
||||
"""Process incoming audio data."""
|
||||
# If already stopping or stdin is closed, ignore further audio, especially residual chunks.
|
||||
if self.is_stopping or (self.ffmpeg_process and self.ffmpeg_process.stdin and self.ffmpeg_process.stdin.closed):
|
||||
logger.warning(f"AudioProcessor is stopping or stdin is closed. Ignoring incoming audio message (length: {len(message)}).")
|
||||
if not message and self.ffmpeg_process and self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
|
||||
logger.info("Received empty message while already in stopping state; ensuring stdin is closed.")
|
||||
try:
|
||||
self.ffmpeg_process.stdin.close()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error closing ffmpeg stdin on redundant stop signal during stopping state: {e}")
|
||||
return
|
||||
|
||||
if not message: # primary signal to start stopping
|
||||
logger.info("Empty audio message received, initiating stop sequence.")
|
||||
self.is_stopping = True
|
||||
if self.ffmpeg_process and self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
|
||||
try:
|
||||
self.ffmpeg_process.stdin.close()
|
||||
logger.info("FFmpeg stdin closed due to primary stop signal.")
|
||||
except Exception as e:
|
||||
logger.warning(f"Error closing ffmpeg stdin on stop: {e}")
|
||||
return
|
||||
|
||||
retry_count = 0
|
||||
max_retries = 3
|
||||
|
||||
@@ -517,4 +620,4 @@ class AudioProcessor:
|
||||
else:
|
||||
logger.error("Maximum retries reached for FFmpeg process")
|
||||
await self.restart_ffmpeg()
|
||||
return
|
||||
return
|
||||
|
||||
Reference in New Issue
Block a user