Merge pull request #46 from QuentinFuxa/solving-ffmpeg-process-freezing-unexpectedly

handle ffmpeg timeouts > 5s
This commit is contained in:
Quentin Fuxa
2025-02-14 18:22:35 +01:00
committed by GitHub

View File

@@ -108,18 +108,30 @@ async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
print("WebSocket connection opened.")
ffmpeg_process = await start_ffmpeg_decoder()
ffmpeg_process = None
pcm_buffer = bytearray()
print("Loading online.")
online = online_factory(args, asr, tokenizer)
print("Online loaded.")
diarization = DiartDiarization(SAMPLE_RATE) if args.diarization else None
if args.diarization:
diarization = DiartDiarization(SAMPLE_RATE)
async def restart_ffmpeg():
nonlocal ffmpeg_process, online, diarization, pcm_buffer
if ffmpeg_process:
try:
ffmpeg_process.kill()
await asyncio.get_event_loop().run_in_executor(None, ffmpeg_process.wait)
except Exception as e:
print(f"Error killing FFmpeg process: {e}")
ffmpeg_process = await start_ffmpeg_decoder()
pcm_buffer = bytearray()
online = online_factory(args, asr, tokenizer)
if args.diarization:
diarization = DiartDiarization(SAMPLE_RATE)
print("FFmpeg process started.")
await restart_ffmpeg()
# Continuously read decoded PCM from ffmpeg stdout in a background task
async def ffmpeg_stdout_reader():
nonlocal pcm_buffer
nonlocal ffmpeg_process, online, diarization, pcm_buffer
loop = asyncio.get_event_loop()
full_transcription = ""
beg = time()
@@ -131,9 +143,23 @@ async def websocket_endpoint(websocket: WebSocket):
elapsed_time = math.floor((time() - beg) * 10) / 10 # Round to 0.1 sec
ffmpeg_buffer_from_duration = max(int(32000 * elapsed_time), 4096)
beg = time()
chunk = await loop.run_in_executor(
None, ffmpeg_process.stdout.read, ffmpeg_buffer_from_duration
)
# Read chunk with timeout
try:
chunk = await asyncio.wait_for(
loop.run_in_executor(
None, ffmpeg_process.stdout.read, ffmpeg_buffer_from_duration
),
timeout=5.0
)
except asyncio.TimeoutError:
print("FFmpeg read timeout. Restarting...")
await restart_ffmpeg()
full_transcription = ""
chunk_history = []
beg = time()
continue # Skip processing and read from new process
if not chunk:
print("FFmpeg stdout closed.")
break
@@ -151,13 +177,13 @@ async def websocket_endpoint(websocket: WebSocket):
if transcription:
chunk_history.append({
"beg": transcription.start,
"end": transcription.end,
"text": transcription.text,
"speaker": "0"
"beg": transcription.start,
"end": transcription.end,
"text": transcription.text,
"speaker": "0"
})
full_transcription += transcription.text
full_transcription += transcription.text if transcription else ""
buffer = online.get_buffer()
if buffer in full_transcription: # With VAC, the buffer is not updated until the next chunk is processed
@@ -200,37 +226,28 @@ async def websocket_endpoint(websocket: WebSocket):
while True:
# Receive incoming WebM audio chunks from the client
message = await websocket.receive_bytes()
# Pass them to ffmpeg via stdin
ffmpeg_process.stdin.write(message)
ffmpeg_process.stdin.flush()
try:
ffmpeg_process.stdin.write(message)
ffmpeg_process.stdin.flush()
except (BrokenPipeError, AttributeError) as e:
print(f"Error writing to FFmpeg: {e}. Restarting...")
await restart_ffmpeg()
ffmpeg_process.stdin.write(message)
ffmpeg_process.stdin.flush()
except WebSocketDisconnect:
print("WebSocket connection closed.")
except Exception as e:
print(f"Error in websocket loop: {e}")
print("WebSocket disconnected.")
finally:
# Clean up ffmpeg and the reader task
stdout_reader_task.cancel()
try:
ffmpeg_process.stdin.close()
ffmpeg_process.wait()
except:
pass
stdout_reader_task.cancel()
try:
ffmpeg_process.stdout.close()
except:
pass
ffmpeg_process.wait()
del online
if args.diarization:
# Stop Diart
diarization.close()
if __name__ == "__main__":
import uvicorn