15 Commits
0.1.5 ... 0.1.7

Author SHA1 Message Date
Quentin Fuxa
eabd1b199a to 0.1.7 2025-05-28 13:29:45 +02:00
Quentin Fuxa
f7644268c1 Message when launching transcription and no audio is detected 2025-05-28 13:25:49 +02:00
Quentin Fuxa
34e8fe260e lag information in real time even when no audio is detected 2025-05-28 12:25:47 +02:00
Quentin Fuxa
debfefaf3e Merge pull request #128 from QuentinFuxa/vac-update
Vac update
2025-05-28 11:51:37 +02:00
Quentin Fuxa
101ca9ef90 Update README.md 2025-05-28 11:50:44 +02:00
Quentin Fuxa
94bb05d53e Update README.md 2025-05-28 11:48:46 +02:00
Quentin Fuxa
6797b88176 Error handling for missing FFmpeg in start_ffmpeg_decoder 2025-05-28 11:43:30 +02:00
Quentin Fuxa
46770efd6c correct error when using VAC 2025-05-28 11:43:18 +02:00
Quentin Fuxa
b23ef3ec3e refactor license for correct shields.io detection 2025-05-28 11:42:26 +02:00
Quentin Fuxa
fa29a24abe Bump version to 0.1.6 2025-05-07 11:45:33 +02:00
Quentin Fuxa
fea3c3553c logging in ASR proc. includes internal buffer duration and transcription lag 2025-05-07 11:45:00 +02:00
Quentin Fuxa
d6d65a663b errors handling when end of transcription 2025-05-07 10:56:04 +02:00
Quentin Fuxa
083d5b2f44 uses sentinel object when end of transcription, to properly terminate tasks 2025-05-07 10:55:44 +02:00
Quentin Fuxa
8e4674b093 End of transcription : Properly sends signal back to the endpoint 2025-05-07 10:55:12 +02:00
Quentin Fuxa
bc7c32100f Mention third-party components 2025-04-14 00:21:43 +02:00
7 changed files with 410 additions and 176 deletions

33
LICENSE
View File

@@ -1,21 +1,28 @@
MIT License
Copyright (c) 2023 ÚFAL
Copyright (c) 2025 Quentin Fuxa.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
---
Based on:
- **whisper_streaming** by ÚFAL MIT License https://github.com/ufal/whisper_streaming. The original work by ÚFAL. License: https://github.com/ufal/whisper_streaming/blob/main/LICENSE
- **silero-vad** by Snakers4 MIT License https://github.com/snakers4/silero-vad. The work by Snakers4 (silero-vad). License: https://github.com/snakers4/silero-vad/blob/f6b1294cb27590fb2452899df98fb234dfef1134/LICENSE
- **Diart** by juanmc2005 MIT License https://github.com/juanmc2005/diart. The work in Diart by juanmc2005. License: https://github.com/juanmc2005/diart/blob/main/LICENSE

View File

@@ -9,8 +9,8 @@
<p align="center">
<a href="https://pypi.org/project/whisperlivekit/"><img alt="PyPI Version" src="https://img.shields.io/pypi/v/whisperlivekit?color=g"></a>
<a href="https://pepy.tech/project/whisperlivekit"><img alt="PyPI Downloads" src="https://static.pepy.tech/personalized-badge/whisperlivekit?period=total&units=international_system&left_color=grey&right_color=brightgreen&left_text=downloads"></a>
<a href="https://pypi.org/project/whisperlivekit/"><img alt="Python Versions" src="https://img.shields.io/badge/python-3.9%20%7C%203.10%20%7C%203.11%20%7C%203.12-dark_green"></a>
<a href="https://github.com/QuentinFuxa/WhisperLiveKit/blob/main/LICENSE"><img alt="License" src="https://img.shields.io/github/license/QuentinFuxa/WhisperLiveKit?color=blue"></a>
<a href="https://pypi.org/project/whisperlivekit/"><img alt="Python Versions" src="https://img.shields.io/badge/python-3.9--3.13-dark_green"></a>
<a href="https://github.com/QuentinFuxa/WhisperLiveKit/blob/main/LICENSE"><img alt="License" src="https://img.shields.io/badge/License-MIT-dark_green"></a>
</p>
## 🚀 Overview

View File

@@ -1,7 +1,7 @@
from setuptools import setup, find_packages
setup(
name="whisperlivekit",
version="0.1.5",
version="0.1.7",
description="Real-time, Fully Local Whisper's Speech-to-Text and Speaker Diarization",
long_description=open("README.md", "r", encoding="utf-8").read(),
long_description_content_type="text/markdown",

View File

@@ -15,6 +15,8 @@ logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
SENTINEL = object() # unique sentinel object for end of stream marker
def format_time(seconds: float) -> str:
"""Format seconds as HH:MM:SS."""
return str(timedelta(seconds=int(seconds)))
@@ -41,8 +43,9 @@ class AudioProcessor:
self.last_ffmpeg_activity = time()
self.ffmpeg_health_check_interval = 5
self.ffmpeg_max_idle_time = 10
# State management
self.is_stopping = False
self.tokens = []
self.buffer_transcription = ""
self.buffer_diarization = ""
@@ -62,6 +65,13 @@ class AudioProcessor:
self.transcription_queue = asyncio.Queue() if self.args.transcription else None
self.diarization_queue = asyncio.Queue() if self.args.diarization else None
self.pcm_buffer = bytearray()
# Task references
self.transcription_task = None
self.diarization_task = None
self.ffmpeg_reader_task = None
self.watchdog_task = None
self.all_tasks_for_cleanup = []
# Initialize transcription engine if enabled
if self.args.transcription:
@@ -73,10 +83,33 @@ class AudioProcessor:
def start_ffmpeg_decoder(self):
"""Start FFmpeg process for WebM to PCM conversion."""
return (ffmpeg.input("pipe:0", format="webm")
.output("pipe:1", format="s16le", acodec="pcm_s16le",
ac=self.channels, ar=str(self.sample_rate))
.run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True))
try:
return (ffmpeg.input("pipe:0", format="webm")
.output("pipe:1", format="s16le", acodec="pcm_s16le",
ac=self.channels, ar=str(self.sample_rate))
.run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True))
except FileNotFoundError:
error = """
FFmpeg is not installed or not found in your system's PATH.
Please install FFmpeg to enable audio processing.
Installation instructions:
# Ubuntu/Debian:
sudo apt update && sudo apt install ffmpeg
# macOS (using Homebrew):
brew install ffmpeg
# Windows:
# 1. Download the latest static build from https://ffmpeg.org/download.html
# 2. Extract the archive (e.g., to C:\\FFmpeg).
# 3. Add the 'bin' directory (e.g., C:\\FFmpeg\\bin) to your system's PATH environment variable.
After installation, please restart the application.
"""
logger.error(error)
raise FileNotFoundError(error)
async def restart_ffmpeg(self):
"""Restart the FFmpeg process after failure."""
@@ -210,7 +243,7 @@ class AudioProcessor:
self.last_ffmpeg_activity = time()
if not chunk:
logger.info("FFmpeg stdout closed.")
logger.info("FFmpeg stdout closed, no more data to read.")
break
self.pcm_buffer.extend(chunk)
@@ -245,45 +278,86 @@ class AudioProcessor:
logger.warning(f"Exception in ffmpeg_stdout_reader: {e}")
logger.warning(f"Traceback: {traceback.format_exc()}")
break
logger.info("FFmpeg stdout processing finished. Signaling downstream processors.")
if self.args.transcription and self.transcription_queue:
await self.transcription_queue.put(SENTINEL)
logger.debug("Sentinel put into transcription_queue.")
if self.args.diarization and self.diarization_queue:
await self.diarization_queue.put(SENTINEL)
logger.debug("Sentinel put into diarization_queue.")
async def transcription_processor(self):
"""Process audio chunks for transcription."""
self.full_transcription = ""
self.sep = self.online.asr.sep
cumulative_pcm_duration_stream_time = 0.0
while True:
try:
pcm_array = await self.transcription_queue.get()
if pcm_array is SENTINEL:
logger.debug("Transcription processor received sentinel. Finishing.")
self.transcription_queue.task_done()
break
logger.info(f"{len(self.online.audio_buffer) / self.online.SAMPLING_RATE} seconds of audio to process.")
if not self.online: # Should not happen if queue is used
logger.warning("Transcription processor: self.online not initialized.")
self.transcription_queue.task_done()
continue
asr_internal_buffer_duration_s = len(self.online.audio_buffer) / self.online.SAMPLING_RATE
transcription_lag_s = max(0.0, time() - self.beg_loop - self.end_buffer)
logger.info(
f"ASR processing: internal_buffer={asr_internal_buffer_duration_s:.2f}s, "
f"lag={transcription_lag_s:.2f}s."
)
# Process transcription
self.online.insert_audio_chunk(pcm_array)
new_tokens = self.online.process_iter()
duration_this_chunk = len(pcm_array) / self.sample_rate if isinstance(pcm_array, np.ndarray) else 0
cumulative_pcm_duration_stream_time += duration_this_chunk
stream_time_end_of_current_pcm = cumulative_pcm_duration_stream_time
self.online.insert_audio_chunk(pcm_array, stream_time_end_of_current_pcm)
new_tokens, current_audio_processed_upto = self.online.process_iter()
if new_tokens:
self.full_transcription += self.sep.join([t.text for t in new_tokens])
# Get buffer information
_buffer = self.online.get_buffer()
buffer = _buffer.text
end_buffer = _buffer.end if _buffer.end else (
new_tokens[-1].end if new_tokens else 0
)
_buffer_transcript_obj = self.online.get_buffer()
buffer_text = _buffer_transcript_obj.text
candidate_end_times = [self.end_buffer]
if new_tokens:
candidate_end_times.append(new_tokens[-1].end)
if _buffer_transcript_obj.end is not None:
candidate_end_times.append(_buffer_transcript_obj.end)
candidate_end_times.append(current_audio_processed_upto)
new_end_buffer = max(candidate_end_times)
# Avoid duplicating content
if buffer in self.full_transcription:
buffer = ""
if buffer_text in self.full_transcription:
buffer_text = ""
await self.update_transcription(
new_tokens, buffer, end_buffer, self.full_transcription, self.sep
new_tokens, buffer_text, new_end_buffer, self.full_transcription, self.sep
)
self.transcription_queue.task_done()
except Exception as e:
logger.warning(f"Exception in transcription_processor: {e}")
logger.warning(f"Traceback: {traceback.format_exc()}")
finally:
self.transcription_queue.task_done()
if 'pcm_array' in locals() and pcm_array is not SENTINEL : # Check if pcm_array was assigned from queue
self.transcription_queue.task_done()
logger.info("Transcription processor task finished.")
async def diarization_processor(self, diarization_obj):
"""Process audio chunks for speaker diarization."""
@@ -292,6 +366,10 @@ class AudioProcessor:
while True:
try:
pcm_array = await self.diarization_queue.get()
if pcm_array is SENTINEL:
logger.debug("Diarization processor received sentinel. Finishing.")
self.diarization_queue.task_done()
break
# Process diarization
await diarization_obj.diarize(pcm_array)
@@ -303,12 +381,15 @@ class AudioProcessor:
)
await self.update_diarization(new_end, buffer_diarization)
self.diarization_queue.task_done()
except Exception as e:
logger.warning(f"Exception in diarization_processor: {e}")
logger.warning(f"Traceback: {traceback.format_exc()}")
finally:
self.diarization_queue.task_done()
if 'pcm_array' in locals() and pcm_array is not SENTINEL:
self.diarization_queue.task_done()
logger.info("Diarization processor task finished.")
async def results_formatter(self):
"""Format processing results for output."""
@@ -372,31 +453,51 @@ class AudioProcessor:
await self.update_diarization(end_attributed_speaker, combined)
buffer_diarization = combined
# Create response object
if not lines:
lines = [{
response_status = "active_transcription"
final_lines_for_response = lines.copy()
if not tokens and not buffer_transcription and not buffer_diarization:
response_status = "no_audio_detected"
final_lines_for_response = []
elif response_status == "active_transcription" and not final_lines_for_response:
final_lines_for_response = [{
"speaker": 1,
"text": "",
"beg": format_time(0),
"end": format_time(tokens[-1].end if tokens else 0),
"beg": format_time(state.get("end_buffer", 0)),
"end": format_time(state.get("end_buffer", 0)),
"diff": 0
}]
response = {
"lines": lines,
"status": response_status,
"lines": final_lines_for_response,
"buffer_transcription": buffer_transcription,
"buffer_diarization": buffer_diarization,
"remaining_time_transcription": state["remaining_time_transcription"],
"remaining_time_diarization": state["remaining_time_diarization"]
}
# Only yield if content has changed
response_content = ' '.join([f"{line['speaker']} {line['text']}" for line in lines]) + \
f" | {buffer_transcription} | {buffer_diarization}"
current_response_signature = f"{response_status} | " + \
' '.join([f"{line['speaker']} {line['text']}" for line in final_lines_for_response]) + \
f" | {buffer_transcription} | {buffer_diarization}"
if response_content != self.last_response_content and (lines or buffer_transcription or buffer_diarization):
if current_response_signature != self.last_response_content and \
(final_lines_for_response or buffer_transcription or buffer_diarization or response_status == "no_audio_detected"):
yield response
self.last_response_content = response_content
self.last_response_content = current_response_signature
# Check for termination condition
if self.is_stopping:
all_processors_done = True
if self.args.transcription and self.transcription_task and not self.transcription_task.done():
all_processors_done = False
if self.args.diarization and self.diarization_task and not self.diarization_task.done():
all_processors_done = False
if all_processors_done:
logger.info("Results formatter: All upstream processors are done and in stopping state. Terminating.")
final_state = await self.get_current_state()
return
await asyncio.sleep(0.1) # Avoid overwhelming the client
@@ -407,65 +508,117 @@ class AudioProcessor:
async def create_tasks(self):
"""Create and start processing tasks."""
tasks = []
self.all_tasks_for_cleanup = []
processing_tasks_for_watchdog = []
if self.args.transcription and self.online:
tasks.append(asyncio.create_task(self.transcription_processor()))
self.transcription_task = asyncio.create_task(self.transcription_processor())
self.all_tasks_for_cleanup.append(self.transcription_task)
processing_tasks_for_watchdog.append(self.transcription_task)
if self.args.diarization and self.diarization:
tasks.append(asyncio.create_task(self.diarization_processor(self.diarization)))
self.diarization_task = asyncio.create_task(self.diarization_processor(self.diarization))
self.all_tasks_for_cleanup.append(self.diarization_task)
processing_tasks_for_watchdog.append(self.diarization_task)
tasks.append(asyncio.create_task(self.ffmpeg_stdout_reader()))
# Monitor overall system health
async def watchdog():
while True:
try:
await asyncio.sleep(10) # Check every 10 seconds instead of 60
current_time = time()
# Check for stalled tasks
for i, task in enumerate(tasks):
if task.done():
exc = task.exception() if task.done() else None
task_name = task.get_name() if hasattr(task, 'get_name') else f"Task {i}"
logger.error(f"{task_name} unexpectedly completed with exception: {exc}")
# Check for FFmpeg process health with shorter thresholds
ffmpeg_idle_time = current_time - self.last_ffmpeg_activity
if ffmpeg_idle_time > 15: # 15 seconds instead of 180
logger.warning(f"FFmpeg idle for {ffmpeg_idle_time:.2f}s - may need attention")
# Force restart after 30 seconds of inactivity (instead of 600)
if ffmpeg_idle_time > 30:
logger.error("FFmpeg idle for too long, forcing restart")
await self.restart_ffmpeg()
except Exception as e:
logger.error(f"Error in watchdog task: {e}")
self.ffmpeg_reader_task = asyncio.create_task(self.ffmpeg_stdout_reader())
self.all_tasks_for_cleanup.append(self.ffmpeg_reader_task)
processing_tasks_for_watchdog.append(self.ffmpeg_reader_task)
tasks.append(asyncio.create_task(watchdog()))
self.tasks = tasks
# Monitor overall system health
self.watchdog_task = asyncio.create_task(self.watchdog(processing_tasks_for_watchdog))
self.all_tasks_for_cleanup.append(self.watchdog_task)
return self.results_formatter()
async def watchdog(self, tasks_to_monitor):
"""Monitors the health of critical processing tasks."""
while True:
try:
await asyncio.sleep(10)
current_time = time()
for i, task in enumerate(tasks_to_monitor):
if task.done():
exc = task.exception()
task_name = task.get_name() if hasattr(task, 'get_name') else f"Monitored Task {i}"
if exc:
logger.error(f"{task_name} unexpectedly completed with exception: {exc}")
else:
logger.info(f"{task_name} completed normally.")
ffmpeg_idle_time = current_time - self.last_ffmpeg_activity
if ffmpeg_idle_time > 15:
logger.warning(f"FFmpeg idle for {ffmpeg_idle_time:.2f}s - may need attention.")
if ffmpeg_idle_time > 30 and not self.is_stopping:
logger.error("FFmpeg idle for too long and not in stopping phase, forcing restart.")
await self.restart_ffmpeg()
except asyncio.CancelledError:
logger.info("Watchdog task cancelled.")
break
except Exception as e:
logger.error(f"Error in watchdog task: {e}", exc_info=True)
async def cleanup(self):
"""Clean up resources when processing is complete."""
for task in self.tasks:
task.cancel()
logger.info("Starting cleanup of AudioProcessor resources.")
for task in self.all_tasks_for_cleanup:
if task and not task.done():
task.cancel()
created_tasks = [t for t in self.all_tasks_for_cleanup if t]
if created_tasks:
await asyncio.gather(*created_tasks, return_exceptions=True)
logger.info("All processing tasks cancelled or finished.")
if self.ffmpeg_process:
if self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
try:
self.ffmpeg_process.stdin.close()
except Exception as e:
logger.warning(f"Error closing ffmpeg stdin during cleanup: {e}")
try:
await asyncio.gather(*self.tasks, return_exceptions=True)
self.ffmpeg_process.stdin.close()
self.ffmpeg_process.wait()
except Exception as e:
logger.warning(f"Error during cleanup: {e}")
if self.args.diarization and hasattr(self, 'diarization'):
# Wait for ffmpeg process to terminate
if self.ffmpeg_process.poll() is None: # Check if process is still running
logger.info("Waiting for FFmpeg process to terminate...")
try:
# Run wait in executor to avoid blocking async loop
await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait, 5.0) # 5s timeout
except Exception as e: # subprocess.TimeoutExpired is not directly caught by asyncio.wait_for with run_in_executor
logger.warning(f"FFmpeg did not terminate gracefully, killing. Error: {e}")
self.ffmpeg_process.kill()
await asyncio.get_event_loop().run_in_executor(None, self.ffmpeg_process.wait) # Wait for kill
logger.info("FFmpeg process terminated.")
if self.args.diarization and hasattr(self, 'diarization') and hasattr(self.diarization, 'close'):
self.diarization.close()
logger.info("AudioProcessor cleanup complete.")
async def process_audio(self, message):
"""Process incoming audio data."""
# If already stopping or stdin is closed, ignore further audio, especially residual chunks.
if self.is_stopping or (self.ffmpeg_process and self.ffmpeg_process.stdin and self.ffmpeg_process.stdin.closed):
logger.warning(f"AudioProcessor is stopping or stdin is closed. Ignoring incoming audio message (length: {len(message)}).")
if not message and self.ffmpeg_process and self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
logger.info("Received empty message while already in stopping state; ensuring stdin is closed.")
try:
self.ffmpeg_process.stdin.close()
except Exception as e:
logger.warning(f"Error closing ffmpeg stdin on redundant stop signal during stopping state: {e}")
return
if not message: # primary signal to start stopping
logger.info("Empty audio message received, initiating stop sequence.")
self.is_stopping = True
if self.ffmpeg_process and self.ffmpeg_process.stdin and not self.ffmpeg_process.stdin.closed:
try:
self.ffmpeg_process.stdin.close()
logger.info("FFmpeg stdin closed due to primary stop signal.")
except Exception as e:
logger.warning(f"Error closing ffmpeg stdin on stop: {e}")
return
retry_count = 0
max_retries = 3
@@ -517,4 +670,4 @@ class AudioProcessor:
else:
logger.error("Maximum retries reached for FFmpeg process")
await self.restart_ffmpeg()
return
return

View File

@@ -44,6 +44,11 @@ async def handle_websocket_results(websocket, results_generator):
try:
async for response in results_generator:
await websocket.send_json(response)
# when the results_generator finishes it means all audio has been processed
logger.info("Results generator finished. Sending 'ready_to_stop' to client.")
await websocket.send_json({"type": "ready_to_stop"})
except WebSocketDisconnect:
logger.info("WebSocket disconnected while handling results (client likely closed connection).")
except Exception as e:
logger.warning(f"Error in WebSocket results handler: {e}")
@@ -62,12 +67,28 @@ async def websocket_endpoint(websocket: WebSocket):
while True:
message = await websocket.receive_bytes()
await audio_processor.process_audio(message)
except KeyError as e:
if 'bytes' in str(e):
logger.warning(f"Client has closed the connection.")
else:
logger.error(f"Unexpected KeyError in websocket_endpoint: {e}", exc_info=True)
except WebSocketDisconnect:
logger.warning("WebSocket disconnected.")
logger.info("WebSocket disconnected by client during message receiving loop.")
except Exception as e:
logger.error(f"Unexpected error in websocket_endpoint main loop: {e}", exc_info=True)
finally:
websocket_task.cancel()
logger.info("Cleaning up WebSocket endpoint...")
if not websocket_task.done():
websocket_task.cancel()
try:
await websocket_task
except asyncio.CancelledError:
logger.info("WebSocket results handler task was cancelled.")
except Exception as e:
logger.warning(f"Exception while awaiting websocket_task completion: {e}")
await audio_processor.cleanup()
logger.info("WebSocket endpoint cleaned up.")
logger.info("WebSocket endpoint cleaned up successfully.")
def main():
"""Entry point for the CLI command."""

View File

@@ -308,6 +308,7 @@
let waveCtx = waveCanvas.getContext("2d");
let animationFrame = null;
let waitingForStop = false;
let lastReceivedData = null;
waveCanvas.width = 60 * (window.devicePixelRatio || 1);
waveCanvas.height = 30 * (window.devicePixelRatio || 1);
waveCtx.scale(window.devicePixelRatio || 1, window.devicePixelRatio || 1);
@@ -357,18 +358,31 @@
websocket.onclose = () => {
if (userClosing) {
if (!statusText.textContent.includes("Recording stopped. Processing final audio")) { // This is a bit of a hack. We should have a better way to handle this. eg. using a status code.
statusText.textContent = "Finished processing audio! Ready to record again.";
if (waitingForStop) {
statusText.textContent = "Processing finalized or connection closed.";
if (lastReceivedData) {
renderLinesWithBuffer(
lastReceivedData.lines || [],
lastReceivedData.buffer_diarization || "",
lastReceivedData.buffer_transcription || "",
0, 0, true // isFinalizing = true
);
}
}
waitingForStop = false;
// If ready_to_stop was received, statusText is already "Finished processing..."
// and waitingForStop is false.
} else {
statusText.textContent =
"Disconnected from the WebSocket server. (Check logs if model is loading.)";
statusText.textContent = "Disconnected from the WebSocket server. (Check logs if model is loading.)";
if (isRecording) {
stopRecording();
stopRecording();
}
}
userClosing = false;
isRecording = false;
waitingForStop = false;
userClosing = false;
lastReceivedData = null;
websocket = null;
updateUI();
};
websocket.onerror = () => {
@@ -382,31 +396,39 @@
// Check for status messages
if (data.type === "ready_to_stop") {
console.log("Ready to stop, closing WebSocket");
// signal that we are not waiting for stop anymore
console.log("Ready to stop received, finalizing display and closing WebSocket.");
waitingForStop = false;
recordButton.disabled = false; // this should be elsewhere
console.log("Record button enabled");
//Now we can close the WebSocket
if (websocket) {
websocket.close();
websocket = null;
if (lastReceivedData) {
renderLinesWithBuffer(
lastReceivedData.lines || [],
lastReceivedData.buffer_diarization || "",
lastReceivedData.buffer_transcription || "",
0, // No more lag
0, // No more lag
true // isFinalizing = true
);
}
statusText.textContent = "Finished processing audio! Ready to record again.";
recordButton.disabled = false;
if (websocket) {
websocket.close(); // will trigger onclose
// websocket = null; // onclose handle setting websocket to null
}
return;
}
lastReceivedData = data;
// Handle normal transcription updates
const {
lines = [],
buffer_transcription = "",
buffer_diarization = "",
remaining_time_transcription = 0,
remaining_time_diarization = 0
remaining_time_diarization = 0,
status = "active_transcription"
} = data;
renderLinesWithBuffer(
@@ -414,13 +436,20 @@
buffer_diarization,
buffer_transcription,
remaining_time_diarization,
remaining_time_transcription
remaining_time_transcription,
false,
status
);
};
});
}
function renderLinesWithBuffer(lines, buffer_diarization, buffer_transcription, remaining_time_diarization, remaining_time_transcription) {
function renderLinesWithBuffer(lines, buffer_diarization, buffer_transcription, remaining_time_diarization, remaining_time_transcription, isFinalizing = false, current_status = "active_transcription") {
if (current_status === "no_audio_detected") {
linesTranscriptDiv.innerHTML = "<p style='text-align: center; color: #666; margin-top: 20px;'><em>No audio detected...</em></p>";
return;
}
const linesHtml = lines.map((item, idx) => {
let timeInfo = "";
if (item.beg !== undefined && item.end !== undefined) {
@@ -430,30 +459,46 @@
let speakerLabel = "";
if (item.speaker === -2) {
speakerLabel = `<span class="silence">Silence<span id='timeInfo'>${timeInfo}</span></span>`;
} else if (item.speaker == 0) {
} else if (item.speaker == 0 && !isFinalizing) {
speakerLabel = `<span class='loading'><span class="spinner"></span><span id='timeInfo'>${remaining_time_diarization} second(s) of audio are undergoing diarization</span></span>`;
} else if (item.speaker == -1) {
speakerLabel = `<span id="speaker"><span id='timeInfo'>${timeInfo}</span></span>`;
} else if (item.speaker !== -1) {
speakerLabel = `<span id="speaker">Speaker 1<span id='timeInfo'>${timeInfo}</span></span>`;
} else if (item.speaker !== -1 && item.speaker !== 0) {
speakerLabel = `<span id="speaker">Speaker ${item.speaker}<span id='timeInfo'>${timeInfo}</span></span>`;
}
let textContent = item.text;
if (idx === lines.length - 1) {
speakerLabel += `<span class="label_transcription"><span class="spinner"></span>Transcription lag <span id='timeInfo'>${remaining_time_transcription}s</span></span>`
}
if (idx === lines.length - 1 && buffer_diarization) {
speakerLabel += `<span class="label_diarization"><span class="spinner"></span>Diarization lag<span id='timeInfo'>${remaining_time_diarization}s</span></span>`
textContent += `<span class="buffer_diarization">${buffer_diarization}</span>`;
}
if (idx === lines.length - 1) {
textContent += `<span class="buffer_transcription">${buffer_transcription}</span>`;
let currentLineText = item.text || "";
if (idx === lines.length - 1) {
if (!isFinalizing) {
if (remaining_time_transcription > 0) {
speakerLabel += `<span class="label_transcription"><span class="spinner"></span>Transcription lag <span id='timeInfo'>${remaining_time_transcription}s</span></span>`;
}
if (buffer_diarization && remaining_time_diarization > 0) {
speakerLabel += `<span class="label_diarization"><span class="spinner"></span>Diarization lag<span id='timeInfo'>${remaining_time_diarization}s</span></span>`;
}
}
if (buffer_diarization) {
if (isFinalizing) {
currentLineText += (currentLineText.length > 0 && buffer_diarization.trim().length > 0 ? " " : "") + buffer_diarization.trim();
} else {
currentLineText += `<span class="buffer_diarization">${buffer_diarization}</span>`;
}
}
if (buffer_transcription) {
if (isFinalizing) {
currentLineText += (currentLineText.length > 0 && buffer_transcription.trim().length > 0 ? " " : "") + buffer_transcription.trim();
} else {
currentLineText += `<span class="buffer_transcription">${buffer_transcription}</span>`;
}
}
}
return textContent
? `<p>${speakerLabel}<br/><div class='textcontent'>${textContent}</div></p>`
: `<p>${speakerLabel}<br/></p>`;
return currentLineText.trim().length > 0 || speakerLabel.length > 0
? `<p>${speakerLabel}<br/><div class='textcontent'>${currentLineText}</div></p>`
: `<p>${speakerLabel}<br/></p>`;
}).join("");
linesTranscriptDiv.innerHTML = linesHtml;
@@ -578,20 +623,6 @@
timerElement.textContent = "00:00";
startTime = null;
if (websocket && websocket.readyState === WebSocket.OPEN) {
try {
await websocket.send(JSON.stringify({
type: "stop",
message: "User stopped recording"
}));
statusText.textContent = "Recording stopped. Processing final audio...";
} catch (e) {
console.error("Could not send stop message:", e);
statusText.textContent = "Recording stopped. Error during final audio processing.";
websocket.close();
websocket = null;
}
}
isRecording = false;
updateUI();
@@ -625,19 +656,22 @@
function updateUI() {
recordButton.classList.toggle("recording", isRecording);
recordButton.disabled = waitingForStop;
if (waitingForStop) {
statusText.textContent = "Please wait for processing to complete...";
recordButton.disabled = true; // Optionally disable the button while waiting
console.log("Record button disabled");
if (statusText.textContent !== "Recording stopped. Processing final audio...") {
statusText.textContent = "Please wait for processing to complete...";
}
} else if (isRecording) {
statusText.textContent = "Recording...";
recordButton.disabled = false;
console.log("Record button enabled");
} else {
statusText.textContent = "Click to start transcription";
if (statusText.textContent !== "Finished processing audio! Ready to record again." &&
statusText.textContent !== "Processing finalized or connection closed.") {
statusText.textContent = "Click to start transcription";
}
}
if (!waitingForStop) {
recordButton.disabled = false;
console.log("Record button enabled");
}
}
@@ -645,4 +679,4 @@
</script>
</body>
</html>
</html>

View File

@@ -144,7 +144,11 @@ class OnlineASRProcessor:
self.transcript_buffer.last_committed_time = self.buffer_time_offset
self.committed: List[ASRToken] = []
def insert_audio_chunk(self, audio: np.ndarray):
def get_audio_buffer_end_time(self) -> float:
"""Returns the absolute end time of the current audio_buffer."""
return self.buffer_time_offset + (len(self.audio_buffer) / self.SAMPLING_RATE)
def insert_audio_chunk(self, audio: np.ndarray, audio_stream_end_time: Optional[float] = None):
"""Append an audio chunk (a numpy array) to the current audio buffer."""
self.audio_buffer = np.append(self.audio_buffer, audio)
@@ -179,18 +183,19 @@ class OnlineASRProcessor:
return self.concatenate_tokens(self.transcript_buffer.buffer)
def process_iter(self) -> Transcript:
def process_iter(self) -> Tuple[List[ASRToken], float]:
"""
Processes the current audio buffer.
Returns a Transcript object representing the committed transcript.
Returns a tuple: (list of committed ASRToken objects, float representing the audio processed up to time).
"""
current_audio_processed_upto = self.get_audio_buffer_end_time()
prompt_text, _ = self.prompt()
logger.debug(
f"Transcribing {len(self.audio_buffer)/self.SAMPLING_RATE:.2f} seconds from {self.buffer_time_offset:.2f}"
)
res = self.asr.transcribe(self.audio_buffer, init_prompt=prompt_text)
tokens = self.asr.ts_words(res) # Expecting List[ASRToken]
tokens = self.asr.ts_words(res)
self.transcript_buffer.insert(tokens, self.buffer_time_offset)
committed_tokens = self.transcript_buffer.flush()
self.committed.extend(committed_tokens)
@@ -210,7 +215,7 @@ class OnlineASRProcessor:
logger.debug(
f"Length of audio buffer now: {len(self.audio_buffer)/self.SAMPLING_RATE:.2f} seconds"
)
return committed_tokens
return committed_tokens, current_audio_processed_upto
def chunk_completed_sentence(self):
"""
@@ -343,15 +348,17 @@ class OnlineASRProcessor:
)
sentences.append(sentence)
return sentences
def finish(self) -> Transcript:
def finish(self) -> Tuple[List[ASRToken], float]:
"""
Flush the remaining transcript when processing ends.
Returns a tuple: (list of remaining ASRToken objects, float representing the final audio processed up to time).
"""
remaining_tokens = self.transcript_buffer.buffer
final_transcript = self.concatenate_tokens(remaining_tokens)
logger.debug(f"Final non-committed transcript: {final_transcript}")
self.buffer_time_offset += len(self.audio_buffer) / self.SAMPLING_RATE
return final_transcript
logger.debug(f"Final non-committed tokens: {remaining_tokens}")
final_processed_upto = self.buffer_time_offset + (len(self.audio_buffer) / self.SAMPLING_RATE)
self.buffer_time_offset = final_processed_upto
return remaining_tokens, final_processed_upto
def concatenate_tokens(
self,
@@ -384,7 +391,8 @@ class VACOnlineASRProcessor:
def __init__(self, online_chunk_size: float, *args, **kwargs):
self.online_chunk_size = online_chunk_size
self.online = OnlineASRProcessor(*args, **kwargs)
self.asr = self.online.asr
# Load a VAD model (e.g. Silero VAD)
import torch
model, _ = torch.hub.load(repo_or_dir="snakers4/silero-vad", model="silero_vad")
@@ -392,28 +400,35 @@ class VACOnlineASRProcessor:
self.vac = FixedVADIterator(model)
self.logfile = self.online.logfile
self.last_input_audio_stream_end_time: float = 0.0
self.init()
def init(self):
self.online.init()
self.vac.reset_states()
self.current_online_chunk_buffer_size = 0
self.last_input_audio_stream_end_time = self.online.buffer_time_offset
self.is_currently_final = False
self.status: Optional[str] = None # "voice" or "nonvoice"
self.audio_buffer = np.array([], dtype=np.float32)
self.buffer_offset = 0 # in frames
def get_audio_buffer_end_time(self) -> float:
"""Returns the absolute end time of the audio processed by the underlying OnlineASRProcessor."""
return self.online.get_audio_buffer_end_time()
def clear_buffer(self):
self.buffer_offset += len(self.audio_buffer)
self.audio_buffer = np.array([], dtype=np.float32)
def insert_audio_chunk(self, audio: np.ndarray):
def insert_audio_chunk(self, audio: np.ndarray, audio_stream_end_time: float):
"""
Process an incoming small audio chunk:
- run VAD on the chunk,
- decide whether to send the audio to the online ASR processor immediately,
- and/or to mark the current utterance as finished.
"""
self.last_input_audio_stream_end_time = audio_stream_end_time
res = self.vac(audio)
self.audio_buffer = np.append(self.audio_buffer, audio)
@@ -455,10 +470,11 @@ class VACOnlineASRProcessor:
self.buffer_offset += max(0, len(self.audio_buffer) - self.SAMPLING_RATE)
self.audio_buffer = self.audio_buffer[-self.SAMPLING_RATE:]
def process_iter(self) -> Transcript:
def process_iter(self) -> Tuple[List[ASRToken], float]:
"""
Depending on the VAD status and the amount of accumulated audio,
process the current audio chunk.
Returns a tuple: (list of committed ASRToken objects, float representing the audio processed up to time).
"""
if self.is_currently_final:
return self.finish()
@@ -467,17 +483,20 @@ class VACOnlineASRProcessor:
return self.online.process_iter()
else:
logger.debug("No online update, only VAD")
return Transcript(None, None, "")
return [], self.last_input_audio_stream_end_time
def finish(self) -> Transcript:
"""Finish processing by flushing any remaining text."""
result = self.online.finish()
def finish(self) -> Tuple[List[ASRToken], float]:
"""
Finish processing by flushing any remaining text.
Returns a tuple: (list of remaining ASRToken objects, float representing the final audio processed up to time).
"""
result_tokens, processed_upto = self.online.finish()
self.current_online_chunk_buffer_size = 0
self.is_currently_final = False
return result
return result_tokens, processed_upto
def get_buffer(self):
"""
Get the unvalidated buffer in string format.
"""
return self.online.concatenate_tokens(self.online.transcript_buffer.buffer).text
return self.online.concatenate_tokens(self.online.transcript_buffer.buffer)