From 84b09bb2cc6cbbbce4078d47c0b45d97068722fa Mon Sep 17 00:00:00 2001 From: Quentin Fuxa Date: Fri, 14 Feb 2025 17:22:03 +0000 Subject: [PATCH] handle ffmpeg timeouts > 5s --- whisper_fastapi_online_server.py | 89 +++++++++++++++++++------------- 1 file changed, 53 insertions(+), 36 deletions(-) diff --git a/whisper_fastapi_online_server.py b/whisper_fastapi_online_server.py index 1ea8731..c8b8436 100644 --- a/whisper_fastapi_online_server.py +++ b/whisper_fastapi_online_server.py @@ -108,18 +108,30 @@ async def websocket_endpoint(websocket: WebSocket): await websocket.accept() print("WebSocket connection opened.") - ffmpeg_process = await start_ffmpeg_decoder() + ffmpeg_process = None pcm_buffer = bytearray() - print("Loading online.") online = online_factory(args, asr, tokenizer) - print("Online loaded.") + diarization = DiartDiarization(SAMPLE_RATE) if args.diarization else None - if args.diarization: - diarization = DiartDiarization(SAMPLE_RATE) + async def restart_ffmpeg(): + nonlocal ffmpeg_process, online, diarization, pcm_buffer + if ffmpeg_process: + try: + ffmpeg_process.kill() + await asyncio.get_event_loop().run_in_executor(None, ffmpeg_process.wait) + except Exception as e: + print(f"Error killing FFmpeg process: {e}") + ffmpeg_process = await start_ffmpeg_decoder() + pcm_buffer = bytearray() + online = online_factory(args, asr, tokenizer) + if args.diarization: + diarization = DiartDiarization(SAMPLE_RATE) + print("FFmpeg process started.") + + await restart_ffmpeg() - # Continuously read decoded PCM from ffmpeg stdout in a background task async def ffmpeg_stdout_reader(): - nonlocal pcm_buffer + nonlocal ffmpeg_process, online, diarization, pcm_buffer loop = asyncio.get_event_loop() full_transcription = "" beg = time() @@ -131,9 +143,23 @@ async def websocket_endpoint(websocket: WebSocket): elapsed_time = math.floor((time() - beg) * 10) / 10 # Round to 0.1 sec ffmpeg_buffer_from_duration = max(int(32000 * elapsed_time), 4096) beg = time() - chunk = await loop.run_in_executor( - None, ffmpeg_process.stdout.read, ffmpeg_buffer_from_duration - ) + + # Read chunk with timeout + try: + chunk = await asyncio.wait_for( + loop.run_in_executor( + None, ffmpeg_process.stdout.read, ffmpeg_buffer_from_duration + ), + timeout=5.0 + ) + except asyncio.TimeoutError: + print("FFmpeg read timeout. Restarting...") + await restart_ffmpeg() + full_transcription = "" + chunk_history = [] + beg = time() + continue # Skip processing and read from new process + if not chunk: print("FFmpeg stdout closed.") break @@ -151,13 +177,13 @@ async def websocket_endpoint(websocket: WebSocket): if transcription: chunk_history.append({ - "beg": transcription.start, - "end": transcription.end, - "text": transcription.text, - "speaker": "0" + "beg": transcription.start, + "end": transcription.end, + "text": transcription.text, + "speaker": "0" }) - - full_transcription += transcription.text + + full_transcription += transcription.text if transcription else "" buffer = online.get_buffer() if buffer in full_transcription: # With VAC, the buffer is not updated until the next chunk is processed @@ -200,37 +226,28 @@ async def websocket_endpoint(websocket: WebSocket): while True: # Receive incoming WebM audio chunks from the client message = await websocket.receive_bytes() - # Pass them to ffmpeg via stdin - ffmpeg_process.stdin.write(message) - ffmpeg_process.stdin.flush() - + try: + ffmpeg_process.stdin.write(message) + ffmpeg_process.stdin.flush() + except (BrokenPipeError, AttributeError) as e: + print(f"Error writing to FFmpeg: {e}. Restarting...") + await restart_ffmpeg() + ffmpeg_process.stdin.write(message) + ffmpeg_process.stdin.flush() except WebSocketDisconnect: - print("WebSocket connection closed.") - except Exception as e: - print(f"Error in websocket loop: {e}") + print("WebSocket disconnected.") finally: - # Clean up ffmpeg and the reader task + stdout_reader_task.cancel() try: ffmpeg_process.stdin.close() + ffmpeg_process.wait() except: pass - stdout_reader_task.cancel() - - try: - ffmpeg_process.stdout.close() - except: - pass - - ffmpeg_process.wait() - del online - if args.diarization: - # Stop Diart diarization.close() - if __name__ == "__main__": import uvicorn