19 Commits
0.1.4 ... 0.1.6

Author SHA1 Message Date
Quentin Fuxa
fa29a24abe Bump version to 0.1.6 2025-05-07 11:45:33 +02:00
Quentin Fuxa
fea3c3553c logging in ASR proc. includes internal buffer duration and transcription lag 2025-05-07 11:45:00 +02:00
Quentin Fuxa
d6d65a663b errors handling when end of transcription 2025-05-07 10:56:04 +02:00
Quentin Fuxa
083d5b2f44 uses sentinel object when end of transcription, to properly terminate tasks 2025-05-07 10:55:44 +02:00
Quentin Fuxa
8e4674b093 End of transcription : Properly sends signal back to the endpoint 2025-05-07 10:55:12 +02:00
Quentin Fuxa
bc7c32100f Mention third-party components 2025-04-14 00:21:43 +02:00
Quentin Fuxa
c4150894af Merge branch 'main' of https://github.com/QuentinFuxa/whisper_streaming_web 2025-04-13 12:11:01 +02:00
Quentin Fuxa
25bf242ce1 bump version to 0.1.5 2025-04-13 12:10:53 +02:00
Quentin Fuxa
14cc601a5c Update README.md 2025-04-13 11:07:53 +02:00
Quentin Fuxa
34d5d513fa fix typo 2025-04-12 18:22:14 +02:00
Quentin Fuxa
2ab3dac948 remove whisper_fastapi_online_server.py 2025-04-12 18:21:04 +02:00
Quentin Fuxa
b56fcffde1 Solves stdin flushes blocking IOhttps://github.com/QuentinFuxa/WhisperLiveKit/issues/110
https://github.com/QuentinFuxa/WhisperLiveKit/issues/106
https://github.com/QuentinFuxa/WhisperLiveKit/issues/90
https://github.com/QuentinFuxa/WhisperLiveKit/issues/87
https://github.com/QuentinFuxa/WhisperLiveKit/issues/81
https://github.com/QuentinFuxa/WhisperLiveKit/issues/2
2025-04-12 15:25:46 +02:00
Quentin Fuxa
2def194893 add ssl certificate and key file arguments to parser 2025-04-11 12:20:22 +02:00
Quentin Fuxa
29978da301 adds ssl possibility in basic server 2025-04-11 12:20:08 +02:00
Quentin Fuxa
b708890788 protocol default to ws 2025-04-11 12:14:14 +02:00
Quentin Fuxa
3ac4c514cf remove temp_kit method to get args. uvicorn reload to False for better perfs 2025-04-11 12:02:52 +02:00
Chris Margach
3c58bfcfa2 update readme for package launch with SSL 2025-04-10 13:47:09 +09:00
Chris Margach
d53b7a323a update sample html to use wss in case of https 2025-04-10 13:46:52 +09:00
Chris Margach
02de5993e6 allow passing of cert and key locations to uvicorn via package 2025-04-10 13:42:30 +09:00
10 changed files with 380 additions and 253 deletions

38
LICENSE
View File

@@ -1,21 +1,33 @@
MIT License MIT License
Copyright (c) 2023 ÚFAL Copyright (c) 2025 Quentin Fuxa.
Based on:
- The original work by ÚFAL. License: https://github.com/ufal/whisper_streaming/blob/main/LICENSE
- The work by Snakers4 (silero-vad). License: https://github.com/snakers4/silero-vad/blob/f6b1294cb27590fb2452899df98fb234dfef1134/LICENSE
- The work in Diart by juanmc2005. License: https://github.com/juanmc2005/diart/blob/main/LICENSE
Permission is hereby granted, free of charge, to any person obtaining a copy Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions: furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software. copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE. SOFTWARE.
---
Third-party components included in this software:
- **whisper_streaming** by ÚFAL MIT License https://github.com/ufal/whisper_streaming
- **silero-vad** by Snakers4 MIT License https://github.com/snakers4/silero-vad
- **Diart** by juanmc2005 MIT License https://github.com/juanmc2005/diart

View File

@@ -15,16 +15,16 @@
## 🚀 Overview ## 🚀 Overview
This project is based on [Whisper Streaming](https://github.com/ufal/whisper_streaming) and lets you transcribe audio directly from your browser. WhisperLiveKit provides a complete backend solution for real-time speech transcription with an example frontend that you can customize for your own needs. Everything runs locally on your machine ✨ This project is based on [Whisper Streaming](https://github.com/ufal/whisper_streaming) and lets you transcribe audio directly from your browser. WhisperLiveKit provides a complete backend solution for real-time speech transcription with a functional and simple frontend that you can customize for your own needs. Everything runs locally on your machine ✨
### 🔄 Architecture ### 🔄 Architecture
WhisperLiveKit consists of two main components: WhisperLiveKit consists of three main components:
- **Backend (Server)**: FastAPI WebSocket server that processes audio and provides real-time transcription - **Frontend**: A basic HTML & JavaScript interface that captures microphone audio and streams it to the backend via WebSockets. You can use and adapt the provided template at [whisperlivekit/web/live_transcription.html](https://github.com/QuentinFuxa/WhisperLiveKit/blob/main/whisperlivekit/web/live_transcription.html) for your specific use case.
- **Frontend Example**: Basic HTML & JavaScript implementation that demonstrates how to capture and stream audio - **Backend (Web Server)**: A FastAPI-based WebSocket server that receives streamed audio data, processes it in real time, and returns transcriptions to the frontend. This is where the WebSocket logic and routing live.
- **Core Backend (Library Logic)**: A server-agnostic core that handles audio processing, ASR, and diarization. It exposes reusable components that take in audio bytes and return transcriptions. This makes it easy to plug into any WebSocket or audio stream pipeline.
> **Note**: We recommend installing this library on the server/backend. For the frontend, you can use and adapt the provided HTML template from [whisperlivekit/web/live_transcription.html](https://github.com/QuentinFuxa/WhisperLiveKit/blob/main/whisperlivekit/web/live_transcription.html) for your specific use case.
### ✨ Key Features ### ✨ Key Features
@@ -33,13 +33,13 @@ WhisperLiveKit consists of two main components:
- **🔒 Fully Local** - All processing happens on your machine - no data sent to external servers - **🔒 Fully Local** - All processing happens on your machine - no data sent to external servers
- **📱 Multi-User Support** - Handle multiple users simultaneously with a single backend/server - **📱 Multi-User Support** - Handle multiple users simultaneously with a single backend/server
### ⚙️ Differences from [Whisper Streaming](https://github.com/ufal/whisper_streaming) ### ⚙️ Core differences from [Whisper Streaming](https://github.com/ufal/whisper_streaming)
- **Automatic Silence Chunking** Automatically chunks when no audio is detected to limit buffer size
- **Multi-User Support** Handles multiple users simultaneously by decoupling backend and online ASR - **Multi-User Support** Handles multiple users simultaneously by decoupling backend and online ASR
- **Confidence Validation** Immediately validate high-confidence tokens for faster inference
- **MLX Whisper Backend** Optimized for Apple Silicon for faster local processing - **MLX Whisper Backend** Optimized for Apple Silicon for faster local processing
- **Buffering Preview** Displays unvalidated transcription segments - **Buffering Preview** Displays unvalidated transcription segments
- **Confidence Validation** Immediately validate high-confidence tokens for faster inference
- **Apple Silicon Optimized** - MLX backend for faster local processing on Mac
## 📖 Quick Start ## 📖 Quick Start
@@ -53,6 +53,14 @@ whisperlivekit-server --model tiny.en
# Open your browser at http://localhost:8000 # Open your browser at http://localhost:8000
``` ```
### Quick Start with SSL
```bash
# You must provide a certificate and key
whisperlivekit-server -ssl-certfile public.crt --ssl-keyfile private.key
# Open your browser at https://localhost:8000
```
That's it! Start speaking and watch your words appear on screen. That's it! Start speaking and watch your words appear on screen.
## 🛠️ Installation Options ## 🛠️ Installation Options
@@ -201,6 +209,8 @@ WhisperLiveKit offers extensive configuration options:
| `--no-vad` | Disable Voice Activity Detection | `False` | | `--no-vad` | Disable Voice Activity Detection | `False` |
| `--buffer_trimming` | Buffer trimming strategy (`sentence` or `segment`) | `segment` | | `--buffer_trimming` | Buffer trimming strategy (`sentence` or `segment`) | `segment` |
| `--warmup-file` | Audio file path for model warmup | `jfk.wav` | | `--warmup-file` | Audio file path for model warmup | `jfk.wav` |
| `--ssl-certfile` | Path to the SSL certificate file (for HTTPS support) | `None` |
| `--ssl-keyfile` | Path to the SSL private key file (for HTTPS support) | `None` |
## 🔧 How It Works ## 🔧 How It Works

BIN
demo.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 424 KiB

After

Width:  |  Height:  |  Size: 438 KiB

View File

@@ -1,7 +1,7 @@
from setuptools import setup, find_packages from setuptools import setup, find_packages
setup( setup(
name="whisperlivekit", name="whisperlivekit",
version="0.1.4", version="0.1.6",
description="Real-time, Fully Local Whisper's Speech-to-Text and Speaker Diarization", description="Real-time, Fully Local Whisper's Speech-to-Text and Speaker Diarization",
long_description=open("README.md", "r", encoding="utf-8").read(), long_description=open("README.md", "r", encoding="utf-8").read(),
long_description_content_type="text/markdown", long_description_content_type="text/markdown",

View File

@@ -1,82 +0,0 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from whisperlivekit import WhisperLiveKit
from whisperlivekit.audio_processor import AudioProcessor
import asyncio
import logging
import os
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logging.getLogger().setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
kit = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global kit
kit = WhisperLiveKit()
yield
app = FastAPI(lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def get():
return HTMLResponse(kit.web_interface())
async def handle_websocket_results(websocket, results_generator):
"""Consumes results from the audio processor and sends them via WebSocket."""
try:
async for response in results_generator:
await websocket.send_json(response)
except Exception as e:
logger.warning(f"Error in WebSocket results handler: {e}")
@app.websocket("/asr")
async def websocket_endpoint(websocket: WebSocket):
audio_processor = AudioProcessor()
await websocket.accept()
logger.info("WebSocket connection opened.")
results_generator = await audio_processor.create_tasks()
websocket_task = asyncio.create_task(handle_websocket_results(websocket, results_generator))
try:
while True:
message = await websocket.receive_bytes()
await audio_processor.process_audio(message)
except WebSocketDisconnect:
logger.warning("WebSocket disconnected.")
finally:
websocket_task.cancel()
await audio_processor.cleanup()
logger.info("WebSocket endpoint cleaned up.")
if __name__ == "__main__":
import uvicorn
temp_kit = WhisperLiveKit(transcription=False, diarization=False)
uvicorn.run(
"whisper_fastapi_online_server:app",
host=temp_kit.args.host,
port=temp_kit.args.port,
reload=True,
log_level="info"
)

View File

@@ -15,6 +15,8 @@ logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
SENTINEL = object() # unique sentinel object for end of stream marker
def format_time(seconds: float) -> str: def format_time(seconds: float) -> str:
"""Format seconds as HH:MM:SS.""" """Format seconds as HH:MM:SS."""
return str(timedelta(seconds=int(seconds))) return str(timedelta(seconds=int(seconds)))
@@ -41,8 +43,9 @@ class AudioProcessor:
self.last_ffmpeg_activity = time() self.last_ffmpeg_activity = time()
self.ffmpeg_health_check_interval = 5 self.ffmpeg_health_check_interval = 5
self.ffmpeg_max_idle_time = 10 self.ffmpeg_max_idle_time = 10
# State management # State management
self.is_stopping = False
self.tokens = [] self.tokens = []
self.buffer_transcription = "" self.buffer_transcription = ""
self.buffer_diarization = "" self.buffer_diarization = ""
@@ -62,6 +65,13 @@ class AudioProcessor:
self.transcription_queue = asyncio.Queue() if self.args.transcription else None self.transcription_queue = asyncio.Queue() if self.args.transcription else None
self.diarization_queue = asyncio.Queue() if self.args.diarization else None self.diarization_queue = asyncio.Queue() if self.args.diarization else None
self.pcm_buffer = bytearray() self.pcm_buffer = bytearray()
# Task references
self.transcription_task = None
self.diarization_task = None
self.ffmpeg_reader_task = None
self.watchdog_task = None
self.all_tasks_for_cleanup = []
# Initialize transcription engine if enabled # Initialize transcription engine if enabled
if self.args.transcription: if self.args.transcription:
@@ -205,24 +215,12 @@ class AudioProcessor:
self.last_ffmpeg_activity = time() self.last_ffmpeg_activity = time()
continue continue
# Reduce timeout for reading from FFmpeg chunk = await loop.run_in_executor(None, self.ffmpeg_process.stdout.read, buffer_size)
try: if chunk:
chunk = await asyncio.wait_for(
loop.run_in_executor(None, self.ffmpeg_process.stdout.read, buffer_size),
timeout=5.0 # Shorter timeout (5 seconds instead of 15)
)
if chunk:
self.last_ffmpeg_activity = time()
except asyncio.TimeoutError:
logger.warning("FFmpeg read timeout. Restarting...")
await self.restart_ffmpeg()
beg = time()
self.last_ffmpeg_activity = time() self.last_ffmpeg_activity = time()
continue
if not chunk: if not chunk:
logger.info("FFmpeg stdout closed.") logger.info("FFmpeg stdout closed, no more data to read.")
break break
self.pcm_buffer.extend(chunk) self.pcm_buffer.extend(chunk)
@@ -233,7 +231,7 @@ class AudioProcessor:
self.convert_pcm_to_float(self.pcm_buffer).copy() self.convert_pcm_to_float(self.pcm_buffer).copy()
) )
# Process when we have enough data # Process when enough data
if len(self.pcm_buffer) >= self.bytes_per_sec: if len(self.pcm_buffer) >= self.bytes_per_sec:
if len(self.pcm_buffer) > self.max_bytes_per_sec: if len(self.pcm_buffer) > self.max_bytes_per_sec:
logger.warning( logger.warning(
@@ -257,6 +255,15 @@ class AudioProcessor:
logger.warning(f"Exception in ffmpeg_stdout_reader: {e}") logger.warning(f"Exception in ffmpeg_stdout_reader: {e}")
logger.warning(f"Traceback: {traceback.format_exc()}") logger.warning(f"Traceback: {traceback.format_exc()}")
break break
logger.info("FFmpeg stdout processing finished. Signaling downstream processors.")
if self.args.transcription and self.transcription_queue:
await self.transcription_queue.put(SENTINEL)
logger.debug("Sentinel put into transcription_queue.")
if self.args.diarization and self.diarization_queue:
await self.diarization_queue.put(SENTINEL)
logger.debug("Sentinel put into diarization_queue.")
async def transcription_processor(self): async def transcription_processor(self):
"""Process audio chunks for transcription.""" """Process audio chunks for transcription."""
@@ -266,8 +273,23 @@ class AudioProcessor:
while True: while True:
try: try:
pcm_array = await self.transcription_queue.get() pcm_array = await self.transcription_queue.get()
if pcm_array is SENTINEL:
logger.debug("Transcription processor received sentinel. Finishing.")
self.transcription_queue.task_done()
break
logger.info(f"{len(self.online.audio_buffer) / self.online.SAMPLING_RATE} seconds of audio to process.") if not self.online: # Should not happen if queue is used
logger.warning("Transcription processor: self.online not initialized.")
self.transcription_queue.task_done()
continue
asr_internal_buffer_duration_s = len(self.online.audio_buffer) / self.online.SAMPLING_RATE
transcription_lag_s = max(0.0, time() - self.beg_loop - self.end_buffer)
logger.info(
f"ASR processing: internal_buffer={asr_internal_buffer_duration_s:.2f}s, "
f"lag={transcription_lag_s:.2f}s."
)
# Process transcription # Process transcription
self.online.insert_audio_chunk(pcm_array) self.online.insert_audio_chunk(pcm_array)
@@ -290,12 +312,15 @@ class AudioProcessor:
await self.update_transcription( await self.update_transcription(
new_tokens, buffer, end_buffer, self.full_transcription, self.sep new_tokens, buffer, end_buffer, self.full_transcription, self.sep
) )
self.transcription_queue.task_done()
except Exception as e: except Exception as e:
logger.warning(f"Exception in transcription_processor: {e}") logger.warning(f"Exception in transcription_processor: {e}")
logger.warning(f"Traceback: {traceback.format_exc()}") logger.warning(f"Traceback: {traceback.format_exc()}")
finally: if 'pcm_array' in locals() and pcm_array is not SENTINEL : # Check if pcm_array was assigned from queue
self.transcription_queue.task_done() self.transcription_queue.task_done()
logger.info("Transcription processor task finished.")
async def diarization_processor(self, diarization_obj): async def diarization_processor(self, diarization_obj):
"""Process audio chunks for speaker diarization.""" """Process audio chunks for speaker diarization."""
@@ -304,6 +329,10 @@ class AudioProcessor:
while True: while True:
try: try:
pcm_array = await self.diarization_queue.get() pcm_array = await self.diarization_queue.get()
if pcm_array is SENTINEL:
logger.debug("Diarization processor received sentinel. Finishing.")
self.diarization_queue.task_done()
break
# Process diarization # Process diarization
await diarization_obj.diarize(pcm_array) await diarization_obj.diarize(pcm_array)
@@ -315,12 +344,15 @@ class AudioProcessor:
) )
await self.update_diarization(new_end, buffer_diarization) await self.update_diarization(new_end, buffer_diarization)
self.diarization_queue.task_done()
except Exception as e: except Exception as e:
logger.warning(f"Exception in diarization_processor: {e}") logger.warning(f"Exception in diarization_processor: {e}")
logger.warning(f"Traceback: {traceback.format_exc()}") logger.warning(f"Traceback: {traceback.format_exc()}")
finally: if 'pcm_array' in locals() and pcm_array is not SENTINEL:
self.diarization_queue.task_done() self.diarization_queue.task_done()
logger.info("Diarization processor task finished.")
async def results_formatter(self): async def results_formatter(self):
"""Format processing results for output.""" """Format processing results for output."""
@@ -410,6 +442,19 @@ class AudioProcessor:
yield response yield response
self.last_response_content = response_content self.last_response_content = response_content
# Check for termination condition
if self.is_stopping:
all_processors_done = True
if self.args.transcription and self.transcription_task and not self.transcription_task.done():
all_processors_done = False
if self.args.diarization and self.diarization_task and not self.diarization_task.done():
all_processors_done = False
if all_processors_done:
logger.info("Results formatter: All upstream processors are done and in stopping state. Terminating.")
final_state = await self.get_current_state()
return
await asyncio.sleep(0.1) # Avoid overwhelming the client await asyncio.sleep(0.1) # Avoid overwhelming the client
except Exception as e: except Exception as e:
@@ -419,65 +464,117 @@ class AudioProcessor:
async def create_tasks(self): async def create_tasks(self):
"""Create and start processing tasks.""" """Create and start processing tasks."""
self.all_tasks_for_cleanup = []
tasks = [] processing_tasks_for_watchdog = []
if self.args.transcription and self.online: if self.args.transcription and self.online:
tasks.append(asyncio.create_task(self.transcription_processor())) self.transcription_task = asyncio.create_task(self.transcription_processor())
self.all_tasks_for_cleanup.append(self.transcription_task)
processing_tasks_for_watchdog.append(self.transcription_task)
if self.args.diarization and self.diarization: if self.args.diarization and self.diarization:
tasks.append(asyncio.create_task(self.diarization_processor(self.diarization))) self.diarization_task = asyncio.create_task(self.diarization_processor(self.diarization))
self.all_tasks_for_cleanup.append(self.diarization_task)
processing_tasks_for_watchdog.append(self.diarization_task)
tasks.append(asyncio.create_task(self.ffmpeg_stdout_reader())) self.ffmpeg_reader_task = asyncio.create_task(self.ffmpeg_stdout_reader())
self.all_tasks_for_cleanup.append(self.ffmpeg_reader_task)
# Monitor overall system health processing_tasks_for_watchdog.append(self.ffmpeg_reader_task)
async def watchdog():
while True:
try:
await asyncio.sleep(10) # Check every 10 seconds instead of 60
current_time = time()
# Check for stalled tasks
for i, task in enumerate(tasks):
if task.done():
exc = task.exception() if task.done() else None
task_name = task.get_name() if hasattr(task, 'get_name') else f"Task {i}"
logger.error(f"{task_name} unexpectedly completed with exception: {exc}")
# Check for FFmpeg process health with shorter thresholds
ffmpeg_idle_time = current_time - self.last_ffmpeg_activity
if ffmpeg_idle_time > 15: # 15 seconds instead of 180
logger.warning(f"FFmpeg idle for {ffmpeg_idle_time:.2f}s - may need attention")
# Force restart after 30 seconds of inactivity (instead of 600)
if ffmpeg_idle_time > 30:
logger.error("FFmpeg idle for too long, forcing restart")
await self.restart_ffmpeg()
except Exception as e:
logger.error(f"Error in watchdog task: {e}")
tasks.append(asyncio.create_task(watchdog())) # Monitor overall system health
self.tasks = tasks self.watchdog_task = asyncio.create_task(self.watchdog(processing_tasks_for_watchdog))
self.all_tasks_for_cleanup.append(self.watchdog_task)
return self.results_formatter() return self.results_formatter()
async def watchdog(self, tasks_to_monitor):
"""Monitors the health of critical processing tasks."""
while True:
try:
await asyncio.sleep(10)
current_time = time()
for i, task in enumerate(tasks_to_monitor):
if task.done():
exc = task.exception()
task_name = task.get_name() if hasattr(task, 'get_name') else f"Monitored Task {i}"
if exc:
logger.error(f"{task_name} unexpectedly completed with exception: {exc}")
else:
logger.info(f"{task_name} completed normally.")
ffmpeg_idle_time = current_time - self.last_ffmpeg_activity
if ffmpeg_idle_time > 15:
logger.warning(f"FFmpeg idle for {ffmpeg_idle_time:.2f}s - may need attention.")
if ffmpeg_idle_time > 30 and not self.is_stopping:
logger.error("FFmpeg idle for too long and not in stopping phase, forcing restart.")
await self.restart_ffmpeg()
except asyncio.CancelledError:
logger.info("Watchdog task cancelled.")
break
except Exception as e:
logger.error(f"Error in watchdog task: {e}", exc_info=True)
async def cleanup(self): async def cleanup(self):
"""Clean up resources when processing is complete.""" """Clean up resources when processing is complete."""
for task in self.tasks: logger.info("Starting cleanup of AudioProcessor resources.")
task.cancel() for task in self.all_tasks_for_cleanup:
if task and not task.done():
task.cancel()
created_tasks = [t for t in self.all_tasks_for_cleanup if t]
if created_tasks:
await asyncio.gather(*created_tasks, return_exceptions=True)
logger.info("All processing tasks cancelled or finished.")
if self.ffmpeg_process:
if self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
try:
self.ffmpeg_process.stdin.close()
except Exception as e:
logger.warning(f"Error closing ffmpeg stdin during cleanup: {e}")
try: # Wait for ffmpeg process to terminate
await asyncio.gather(*self.tasks, return_exceptions=True) if self.ffmpeg_process.poll() is None: # Check if process is still running
self.ffmpeg_process.stdin.close() logger.info("Waiting for FFmpeg process to terminate...")
self.ffmpeg_process.wait() try:
except Exception as e: # Run wait in executor to avoid blocking async loop
logger.warning(f"Error during cleanup: {e}") await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait, 5.0) # 5s timeout
except Exception as e: # subprocess.TimeoutExpired is not directly caught by asyncio.wait_for with run_in_executor
if self.args.diarization and hasattr(self, 'diarization'): logger.warning(f"FFmpeg did not terminate gracefully, killing. Error: {e}")
self.ffmpeg_process.kill()
await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait) # Wait for kill
logger.info("FFmpeg process terminated.")
if self.args.diarization and hasattr(self, 'diarization') and hasattr(self.diarization, 'close'):
self.diarization.close() self.diarization.close()
logger.info("AudioProcessor cleanup complete.")
async def process_audio(self, message): async def process_audio(self, message):
"""Process incoming audio data.""" """Process incoming audio data."""
# If already stopping or stdin is closed, ignore further audio, especially residual chunks.
if self.is_stopping or (self.ffmpeg_process and self.ffmpeg_process.stdin and self.ffmpeg_process.stdin.closed):
logger.warning(f"AudioProcessor is stopping or stdin is closed. Ignoring incoming audio message (length: {len(message)}).")
if not message and self.ffmpeg_process and self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
logger.info("Received empty message while already in stopping state; ensuring stdin is closed.")
try:
self.ffmpeg_process.stdin.close()
except Exception as e:
logger.warning(f"Error closing ffmpeg stdin on redundant stop signal during stopping state: {e}")
return
if not message: # primary signal to start stopping
logger.info("Empty audio message received, initiating stop sequence.")
self.is_stopping = True
if self.ffmpeg_process and self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
try:
self.ffmpeg_process.stdin.close()
logger.info("FFmpeg stdin closed due to primary stop signal.")
except Exception as e:
logger.warning(f"Error closing ffmpeg stdin on stop: {e}")
return
retry_count = 0 retry_count = 0
max_retries = 3 max_retries = 3
@@ -492,20 +589,41 @@ class AudioProcessor:
if not self.ffmpeg_process or not hasattr(self.ffmpeg_process, 'stdin') or self.ffmpeg_process.poll() is not None: if not self.ffmpeg_process or not hasattr(self.ffmpeg_process, 'stdin') or self.ffmpeg_process.poll() is not None:
logger.warning("FFmpeg process not available, restarting...") logger.warning("FFmpeg process not available, restarting...")
await self.restart_ffmpeg() await self.restart_ffmpeg()
self.ffmpeg_process.stdin.write(message)
self.ffmpeg_process.stdin.flush()
self.last_ffmpeg_activity = time() # Update activity timestamp
return
loop = asyncio.get_running_loop()
try:
await asyncio.wait_for(
loop.run_in_executor(None, lambda: self.ffmpeg_process.stdin.write(message)),
timeout=2.0
)
except asyncio.TimeoutError:
logger.warning("FFmpeg write operation timed out, restarting...")
await self.restart_ffmpeg()
retry_count += 1
continue
try:
await asyncio.wait_for(
loop.run_in_executor(None, self.ffmpeg_process.stdin.flush),
timeout=2.0
)
except asyncio.TimeoutError:
logger.warning("FFmpeg flush operation timed out, restarting...")
await self.restart_ffmpeg()
retry_count += 1
continue
self.last_ffmpeg_activity = time()
return
except (BrokenPipeError, AttributeError, OSError) as e: except (BrokenPipeError, AttributeError, OSError) as e:
retry_count += 1 retry_count += 1
logger.warning(f"Error writing to FFmpeg: {e}. Retry {retry_count}/{max_retries}...") logger.warning(f"Error writing to FFmpeg: {e}. Retry {retry_count}/{max_retries}...")
if retry_count < max_retries: if retry_count < max_retries:
await self.restart_ffmpeg() await self.restart_ffmpeg()
await asyncio.sleep(0.5) # Shorter pause between retries await asyncio.sleep(0.5)
else: else:
logger.error("Maximum retries reached for FFmpeg process") logger.error("Maximum retries reached for FFmpeg process")
await self.restart_ffmpeg() await self.restart_ffmpeg()
return return

View File

@@ -3,12 +3,13 @@ from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse from fastapi.responses import HTMLResponse
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from whisperlivekit import WhisperLiveKit from whisperlivekit import WhisperLiveKit, parse_args
from whisperlivekit.audio_processor import AudioProcessor from whisperlivekit.audio_processor import AudioProcessor
import asyncio import asyncio
import logging import logging
import os import os, sys
import argparse
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logging.getLogger().setLevel(logging.WARNING) logging.getLogger().setLevel(logging.WARNING)
@@ -43,6 +44,11 @@ async def handle_websocket_results(websocket, results_generator):
try: try:
async for response in results_generator: async for response in results_generator:
await websocket.send_json(response) await websocket.send_json(response)
# when the results_generator finishes it means all audio has been processed
logger.info("Results generator finished. Sending 'ready_to_stop' to client.")
await websocket.send_json({"type": "ready_to_stop"})
except WebSocketDisconnect:
logger.info("WebSocket disconnected while handling results (client likely closed connection).")
except Exception as e: except Exception as e:
logger.warning(f"Error in WebSocket results handler: {e}") logger.warning(f"Error in WebSocket results handler: {e}")
@@ -61,26 +67,58 @@ async def websocket_endpoint(websocket: WebSocket):
while True: while True:
message = await websocket.receive_bytes() message = await websocket.receive_bytes()
await audio_processor.process_audio(message) await audio_processor.process_audio(message)
except KeyError as e:
if 'bytes' in str(e):
logger.warning(f"Client has closed the connection.")
else:
logger.error(f"Unexpected KeyError in websocket_endpoint: {e}", exc_info=True)
except WebSocketDisconnect: except WebSocketDisconnect:
logger.warning("WebSocket disconnected.") logger.info("WebSocket disconnected by client during message receiving loop.")
except Exception as e:
logger.error(f"Unexpected error in websocket_endpoint main loop: {e}", exc_info=True)
finally: finally:
websocket_task.cancel() logger.info("Cleaning up WebSocket endpoint...")
if not websocket_task.done():
websocket_task.cancel()
try:
await websocket_task
except asyncio.CancelledError:
logger.info("WebSocket results handler task was cancelled.")
except Exception as e:
logger.warning(f"Exception while awaiting websocket_task completion: {e}")
await audio_processor.cleanup() await audio_processor.cleanup()
logger.info("WebSocket endpoint cleaned up.") logger.info("WebSocket endpoint cleaned up successfully.")
def main(): def main():
"""Entry point for the CLI command.""" """Entry point for the CLI command."""
import uvicorn import uvicorn
temp_kit = WhisperLiveKit(transcription=False, diarization=False) args = parse_args()
uvicorn.run( uvicorn_kwargs = {
"whisperlivekit.basic_server:app", "app": "whisperlivekit.basic_server:app",
host=temp_kit.args.host, "host":args.host,
port=temp_kit.args.port, "port":args.port,
reload=True, "reload": False,
log_level="info" "log_level": "info",
) "lifespan": "on",
}
ssl_kwargs = {}
if args.ssl_certfile or args.ssl_keyfile:
if not (args.ssl_certfile and args.ssl_keyfile):
raise ValueError("Both --ssl-certfile and --ssl-keyfile must be specified together.")
ssl_kwargs = {
"ssl_certfile": args.ssl_certfile,
"ssl_keyfile": args.ssl_keyfile
}
if ssl_kwargs:
uvicorn_kwargs = {**uvicorn_kwargs, **ssl_kwargs}
uvicorn.run(**uvicorn_kwargs)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -130,6 +130,9 @@ def parse_args():
help="Set the log level", help="Set the log level",
default="DEBUG", default="DEBUG",
) )
parser.add_argument("--ssl-certfile", type=str, help="Path to the SSL certificate file.", default=None)
parser.add_argument("--ssl-keyfile", type=str, help="Path to the SSL private key file.", default=None)
args = parser.parse_args() args = parser.parse_args()

View File

@@ -308,6 +308,7 @@
let waveCtx = waveCanvas.getContext("2d"); let waveCtx = waveCanvas.getContext("2d");
let animationFrame = null; let animationFrame = null;
let waitingForStop = false; let waitingForStop = false;
let lastReceivedData = null;
waveCanvas.width = 60 * (window.devicePixelRatio || 1); waveCanvas.width = 60 * (window.devicePixelRatio || 1);
waveCanvas.height = 30 * (window.devicePixelRatio || 1); waveCanvas.height = 30 * (window.devicePixelRatio || 1);
waveCtx.scale(window.devicePixelRatio || 1, window.devicePixelRatio || 1); waveCtx.scale(window.devicePixelRatio || 1, window.devicePixelRatio || 1);
@@ -321,7 +322,8 @@
const host = window.location.hostname || "localhost"; const host = window.location.hostname || "localhost";
const port = window.location.port || "8000"; const port = window.location.port || "8000";
const defaultWebSocketUrl = `ws://${host}:${port}/asr`; const protocol = window.location.protocol === "https:" ? "wss" : "ws";
const defaultWebSocketUrl = `${protocol}://${host}:${port}/asr`;
websocketInput.value = defaultWebSocketUrl; websocketInput.value = defaultWebSocketUrl;
websocketUrl = defaultWebSocketUrl; websocketUrl = defaultWebSocketUrl;
@@ -356,18 +358,31 @@
websocket.onclose = () => { websocket.onclose = () => {
if (userClosing) { if (userClosing) {
if (!statusText.textContent.includes("Recording stopped. Processing final audio")) { // This is a bit of a hack. We should have a better way to handle this. eg. using a status code. if (waitingForStop) {
statusText.textContent = "Finished processing audio! Ready to record again."; statusText.textContent = "Processing finalized or connection closed.";
if (lastReceivedData) {
renderLinesWithBuffer(
lastReceivedData.lines || [],
lastReceivedData.buffer_diarization || "",
lastReceivedData.buffer_transcription || "",
0, 0, true // isFinalizing = true
);
}
} }
waitingForStop = false; // If ready_to_stop was received, statusText is already "Finished processing..."
// and waitingForStop is false.
} else { } else {
statusText.textContent = statusText.textContent = "Disconnected from the WebSocket server. (Check logs if model is loading.)";
"Disconnected from the WebSocket server. (Check logs if model is loading.)";
if (isRecording) { if (isRecording) {
stopRecording(); stopRecording();
} }
} }
userClosing = false; isRecording = false;
waitingForStop = false;
userClosing = false;
lastReceivedData = null;
websocket = null;
updateUI();
}; };
websocket.onerror = () => { websocket.onerror = () => {
@@ -381,24 +396,31 @@
// Check for status messages // Check for status messages
if (data.type === "ready_to_stop") { if (data.type === "ready_to_stop") {
console.log("Ready to stop, closing WebSocket"); console.log("Ready to stop received, finalizing display and closing WebSocket.");
// signal that we are not waiting for stop anymore
waitingForStop = false; waitingForStop = false;
recordButton.disabled = false; // this should be elsewhere
console.log("Record button enabled");
//Now we can close the WebSocket if (lastReceivedData) {
if (websocket) { renderLinesWithBuffer(
websocket.close(); lastReceivedData.lines || [],
websocket = null; lastReceivedData.buffer_diarization || "",
lastReceivedData.buffer_transcription || "",
0, // No more lag
0, // No more lag
true // isFinalizing = true
);
} }
statusText.textContent = "Finished processing audio! Ready to record again.";
recordButton.disabled = false;
if (websocket) {
websocket.close(); // will trigger onclose
// websocket = null; // onclose handle setting websocket to null
}
return; return;
} }
lastReceivedData = data;
// Handle normal transcription updates // Handle normal transcription updates
const { const {
lines = [], lines = [],
@@ -413,13 +435,14 @@
buffer_diarization, buffer_diarization,
buffer_transcription, buffer_transcription,
remaining_time_diarization, remaining_time_diarization,
remaining_time_transcription remaining_time_transcription,
false // isFinalizing = false for normal updates
); );
}; };
}); });
} }
function renderLinesWithBuffer(lines, buffer_diarization, buffer_transcription, remaining_time_diarization, remaining_time_transcription) { function renderLinesWithBuffer(lines, buffer_diarization, buffer_transcription, remaining_time_diarization, remaining_time_transcription, isFinalizing = false) {
const linesHtml = lines.map((item, idx) => { const linesHtml = lines.map((item, idx) => {
let timeInfo = ""; let timeInfo = "";
if (item.beg !== undefined && item.end !== undefined) { if (item.beg !== undefined && item.end !== undefined) {
@@ -429,30 +452,46 @@
let speakerLabel = ""; let speakerLabel = "";
if (item.speaker === -2) { if (item.speaker === -2) {
speakerLabel = `<span class="silence">Silence<span id='timeInfo'>${timeInfo}</span></span>`; speakerLabel = `<span class="silence">Silence<span id='timeInfo'>${timeInfo}</span></span>`;
} else if (item.speaker == 0) { } else if (item.speaker == 0 && !isFinalizing) {
speakerLabel = `<span class='loading'><span class="spinner"></span><span id='timeInfo'>${remaining_time_diarization} second(s) of audio are undergoing diarization</span></span>`; speakerLabel = `<span class='loading'><span class="spinner"></span><span id='timeInfo'>${remaining_time_diarization} second(s) of audio are undergoing diarization</span></span>`;
} else if (item.speaker == -1) { } else if (item.speaker == -1) {
speakerLabel = `<span id="speaker"><span id='timeInfo'>${timeInfo}</span></span>`; speakerLabel = `<span id="speaker">Speaker 1<span id='timeInfo'>${timeInfo}</span></span>`;
} else if (item.speaker !== -1) { } else if (item.speaker !== -1 && item.speaker !== 0) {
speakerLabel = `<span id="speaker">Speaker ${item.speaker}<span id='timeInfo'>${timeInfo}</span></span>`; speakerLabel = `<span id="speaker">Speaker ${item.speaker}<span id='timeInfo'>${timeInfo}</span></span>`;
} }
let textContent = item.text;
if (idx === lines.length - 1) { let currentLineText = item.text || "";
speakerLabel += `<span class="label_transcription"><span class="spinner"></span>Transcription lag <span id='timeInfo'>${remaining_time_transcription}s</span></span>`
} if (idx === lines.length - 1) {
if (idx === lines.length - 1 && buffer_diarization) { if (!isFinalizing) {
speakerLabel += `<span class="label_diarization"><span class="spinner"></span>Diarization lag<span id='timeInfo'>${remaining_time_diarization}s</span></span>` if (remaining_time_transcription > 0) {
textContent += `<span class="buffer_diarization">${buffer_diarization}</span>`; speakerLabel += `<span class="label_transcription"><span class="spinner"></span>Transcription lag <span id='timeInfo'>${remaining_time_transcription}s</span></span>`;
} }
if (idx === lines.length - 1) { if (buffer_diarization && remaining_time_diarization > 0) {
textContent += `<span class="buffer_transcription">${buffer_transcription}</span>`; speakerLabel += `<span class="label_diarization"><span class="spinner"></span>Diarization lag<span id='timeInfo'>${remaining_time_diarization}s</span></span>`;
}
}
if (buffer_diarization) {
if (isFinalizing) {
currentLineText += (currentLineText.length > 0 && buffer_diarization.trim().length > 0 ? " " : "") + buffer_diarization.trim();
} else {
currentLineText += `<span class="buffer_diarization">${buffer_diarization}</span>`;
}
}
if (buffer_transcription) {
if (isFinalizing) {
currentLineText += (currentLineText.length > 0 && buffer_transcription.trim().length > 0 ? " " : "") + buffer_transcription.trim();
} else {
currentLineText += `<span class="buffer_transcription">${buffer_transcription}</span>`;
}
}
} }
return currentLineText.trim().length > 0 || speakerLabel.length > 0
return textContent ? `<p>${speakerLabel}<br/><div class='textcontent'>${currentLineText}</div></p>`
? `<p>${speakerLabel}<br/><div class='textcontent'>${textContent}</div></p>` : `<p>${speakerLabel}<br/></p>`;
: `<p>${speakerLabel}<br/></p>`;
}).join(""); }).join("");
linesTranscriptDiv.innerHTML = linesHtml; linesTranscriptDiv.innerHTML = linesHtml;
@@ -577,20 +616,6 @@
timerElement.textContent = "00:00"; timerElement.textContent = "00:00";
startTime = null; startTime = null;
if (websocket && websocket.readyState === WebSocket.OPEN) {
try {
await websocket.send(JSON.stringify({
type: "stop",
message: "User stopped recording"
}));
statusText.textContent = "Recording stopped. Processing final audio...";
} catch (e) {
console.error("Could not send stop message:", e);
statusText.textContent = "Recording stopped. Error during final audio processing.";
websocket.close();
websocket = null;
}
}
isRecording = false; isRecording = false;
updateUI(); updateUI();
@@ -624,19 +649,22 @@
function updateUI() { function updateUI() {
recordButton.classList.toggle("recording", isRecording); recordButton.classList.toggle("recording", isRecording);
recordButton.disabled = waitingForStop;
if (waitingForStop) { if (waitingForStop) {
statusText.textContent = "Please wait for processing to complete..."; if (statusText.textContent !== "Recording stopped. Processing final audio...") {
recordButton.disabled = true; // Optionally disable the button while waiting statusText.textContent = "Please wait for processing to complete...";
console.log("Record button disabled"); }
} else if (isRecording) { } else if (isRecording) {
statusText.textContent = "Recording..."; statusText.textContent = "Recording...";
recordButton.disabled = false;
console.log("Record button enabled");
} else { } else {
statusText.textContent = "Click to start transcription"; if (statusText.textContent !== "Finished processing audio! Ready to record again." &&
statusText.textContent !== "Processing finalized or connection closed.") {
statusText.textContent = "Click to start transcription";
}
}
if (!waitingForStop) {
recordButton.disabled = false; recordButton.disabled = false;
console.log("Record button enabled");
} }
} }
@@ -644,4 +672,4 @@
</script> </script>
</body> </body>
</html> </html>

View File

@@ -179,7 +179,7 @@ def warmup_asr(asr, warmup_file=None, timeout=5):
logger.warning(f"Warmup file {warmup_file} invalid or missing.") logger.warning(f"Warmup file {warmup_file} invalid or missing.")
return False return False
print(f"Warmping up Whisper with {warmup_file}") print(f"Warming up Whisper with {warmup_file}")
try: try:
import librosa import librosa
audio, sr = librosa.load(warmup_file, sr=16000) audio, sr = librosa.load(warmup_file, sr=16000)