23 Commits

Author SHA1 Message Date
Quentin Fuxa
9d4ae33249 WIP. Trying ten VAD #280 2025-11-23 11:20:00 +01:00
Quentin Fuxa
6206fff118 0.2.15 2025-11-21 23:52:00 +01:00
Quentin Fuxa
b5067249c0 stt/diar/nllw alignment: internal rework 5 2025-11-20 23:52:00 +01:00
Quentin Fuxa
f4f9831d39 stt/diar/nllw alignment: internal rework 5 2025-11-20 23:52:00 +01:00
Quentin Fuxa
254faaf64c stt/diar/nllw alignment: internal rework 5 2025-11-20 23:52:00 +01:00
Quentin Fuxa
8e7aea4fcf internal rework 4 2025-11-20 23:45:20 +01:00
Quentin Fuxa
270faf2069 internal rework 3 2025-11-20 22:28:30 +01:00
Quentin Fuxa
b7c1cc77cc internal rework 2 2025-11-20 22:06:38 +01:00
Quentin Fuxa
9a45ec221c internal rework 1 2025-11-20 12:58:38 +01:00
Quentin Fuxa
3e13ee6fc3 bump to post4 2025-11-19 21:23:43 +01:00
Quentin Fuxa
b7d20a0ff0 segment attribution in result formatter 2025-11-19 21:10:28 +01:00
Quentin Fuxa
c1bb9c2bde reduce flickering remaining_time_transcription 2025-11-19 19:09:37 +01:00
Quentin Fuxa
11e9def0b2 diarization corrections 2025-11-19 19:06:03 +01:00
Quentin Fuxa
3104f40f6e fixes #279 #278 2025-11-19 18:17:50 +01:00
Quentin Fuxa
e9b4ceeee5 Add audio partial silence in chunks handling. bump to 0.2.14.post3 2025-11-17 22:52:00 +01:00
Quentin Fuxa
437641fb43 reduce min-chunk-size to 0.1, set default model to base 2027-04-25 23:52:00 +02:00
Quentin Fuxa
bfd60b3921 Add audio partial silence in chunks handling. bump to 0.2.14.post2 2025-11-17 22:52:00 +01:00
Quentin Fuxa
1e67bf97f0 improve buffering when use of heavy models 2027-04-25 23:52:00 +02:00
Quentin Fuxa
bbd4fd6cff Merge branch 'improve_EOS_handling' 2025-11-16 22:30:31 +01:00
Quentin Fuxa
28985962a0 Silence handling: finish transcription even if not validated at the BEGINNING of the silence 2025-11-16 22:29:08 +01:00
Quentin Fuxa
a38c103fcd simulstreaming coreml encoder compatibility 2025-11-16 21:24:14 +01:00
Quentin Fuxa
4d2ffb24f8 coreml conversion 2025-11-16 19:11:43 +01:00
Quentin Fuxa
1bbbb7903c lora loader in shared whisper core 2025-11-16 18:44:35 +01:00
23 changed files with 1224 additions and 1226 deletions

View File

@@ -141,7 +141,7 @@ async def websocket_endpoint(websocket: WebSocket):
|-----------|-------------|---------|
| `--model` | Whisper model size. List and recommandations [here](https://github.com/QuentinFuxa/WhisperLiveKit/blob/main/docs/available_models.md) | `small` |
| `--model-path` | Local .pt file/directory **or** Hugging Face repo ID containing the Whisper model. Overrides `--model`. Recommandations [here](https://github.com/QuentinFuxa/WhisperLiveKit/blob/main/docs/models_compatible_formats.md) | `None` |
| `--language` | List [here](https://github.com/QuentinFuxa/WhisperLiveKit/blob/main/whisperlivekit/simul_whisper/whisper/tokenizer.py). If you use `auto`, the model attempts to detect the language automatically, but it tends to bias towards English. | `auto` |
| `--language` | List [here](https://github.com/QuentinFuxa/WhisperLiveKit/blob/main/whisperlivekit/whisper/tokenizer.py). If you use `auto`, the model attempts to detect the language automatically, but it tends to bias towards English. | `auto` |
| `--target-language` | If sets, translates using [NLLW](https://github.com/QuentinFuxa/NoLanguageLeftWaiting). [200 languages available](https://github.com/QuentinFuxa/WhisperLiveKit/blob/main/docs/supported_languages.md). If you want to translate to english, you can also use `--direct-english-translation`. The STT model will try to directly output the translation. | `None` |
| `--diarization` | Enable speaker identification | `False` |
| `--backend-policy` | Streaming strategy: `1`/`simulstreaming` uses AlignAtt SimulStreaming, `2`/`localagreement` uses the LocalAgreement policy | `simulstreaming` |

View File

@@ -0,0 +1,71 @@
### Alignment between STT Tokens and Diarization Segments
- Example 1: The punctuation from STT and the speaker change from Diariation come in the prediction `t`
- Example 2: The punctuation from STT comes from prediction `t`, but the speaker change from Diariation come in the prediction `t-1`
- Example 3: The punctuation from STT comes from prediction `t-1`, but the speaker change from Diariation come in the prediction `t`
> `#` Is the split between the `t-1` prediction and `t` prediction.
## Example 1:
```text
punctuations_segments : __#_______.__________________!____
diarization_segments:
SPK1 __#____________
SPK2 # ___________________
-->
ALIGNED SPK1 __#_______.
ALIGNED SPK2 # __________________!____
t-1 output:
SPK1: __#
SPK2: NO
DIARIZATION BUFFER: NO
t output:
SPK1: __#__.
SPK2: __________________!____
DIARIZATION BUFFER: No
```
## Example 2:
```text
punctuations_segments : _____#__.___________
diarization_segments:
SPK1 ___ #
SPK2 __#______________
-->
ALIGNED SPK1 _____#__.
ALIGNED SPK2 # ___________
t-1 output:
SPK1: ___ #
SPK2:
DIARIZATION BUFFER: __#
t output:
SPK1: __#__.
SPK2: ___________
DIARIZATION BUFFER: No
```
## Example 3:
```text
punctuations_segments : ___.__#__________
diarization_segments:
SPK1 ______#__
SPK2 # ________
-->
ALIGNED SPK1 ___. #
ALIGNED SPK2 __#__________
t-1 output:
SPK1: ___. #
SPK2:
DIARIZATION BUFFER: __#
t output:
SPK1: #
SPK2: __#___________
DIARIZATION BUFFER: NO
```

View File

@@ -0,0 +1,43 @@
# Technical Integration Guide
This document introduce how to reuse the core components when you do **not** want to ship the bundled frontend, FastAPI server, or even the provided CLI.
---
## 1. Runtime Components
| Layer | File(s) | Purpose |
|-------|---------|---------|
| Transport | `whisperlivekit/basic_server.py`, any ASGI/WebSocket server | Accepts audio over WebSocket (MediaRecorder WebM or raw PCM chunks) and streams JSON updates back |
| Audio processing | `whisperlivekit/audio_processor.py` | Buffers audio, orchestrates transcription, diarization, translation, handles FFmpeg/PCM input |
| Engines | `whisperlivekit/core.py`, `whisperlivekit/simul_whisper/*`, `whisperlivekit/local_agreement/*` | Load models once (SimulStreaming or LocalAgreement), expose `TranscriptionEngine` and helpers |
| Frontends | `whisperlivekit/web/*`, `chrome-extension/*` | Optional UI layers feeding the WebSocket endpoint |
**Key idea:** The server boundary is just `AudioProcessor.process_audio()` for incoming bytes and the async generator returned by `AudioProcessor.create_tasks()` for outgoing updates (`FrontData`). Everything else is optional.
---
## 2. Running Without the Bundled Frontend
1. Start the server/engine however you like:
```bash
wlk --model small --language en --host 0.0.0.0 --port 9000
# or launch your own app that instantiates TranscriptionEngine(...)
```
2. Build your own client (browser, mobile, desktop) that:
- Opens `ws(s)://<host>:<port>/asr`
- Sends either MediaRecorder/Opus WebM blobs **or** raw PCM (`--pcm-input` on the server tells the client to use the AudioWorklet).
- Consumes the JSON payload defined in `docs/API.md`.
---
## 3. Running Without FastAPI
`whisperlivekit/basic_server.py` is just an example. Any async framework works, as long as you:
1. Create a global `TranscriptionEngine` (expensive to initialize; reuse it).
2. Instantiate `AudioProcessor(transcription_engine=engine)` for each connection.
3. Call `create_tasks()` to get the async generator, `process_audio()` with incoming bytes, and ensure `cleanup()` runs when the client disconnects.
If you prefer to send compressed audio, instantiate `AudioProcessor(pcm_input=False)` and pipe encoded chunks through `FFmpegManager` transparently—just ensure `ffmpeg` is available or be ready to handle the `"ffmpeg_not_found"` error in the streamed `FrontData`.

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "whisperlivekit"
version = "0.2.14"
version = "0.2.15"
description = "Real-time speech-to-text with speaker diarization using Whisper"
readme = "README.md"
authors = [

View File

@@ -1,43 +1,46 @@
import asyncio
import numpy as np
from time import time, sleep
import math
from time import time
import logging
import traceback
from typing import Optional, Union, List, Any, AsyncGenerator
from whisperlivekit.timed_objects import ASRToken, Silence, Line, FrontData, State, Transcript, ChangeSpeaker
from whisperlivekit.core import TranscriptionEngine, online_factory, online_diarization_factory, online_translation_factory
from whisperlivekit.silero_vad_iterator import FixedVADIterator
from whisperlivekit.results_formater import format_output
from whisperlivekit.ffmpeg_manager import FFmpegManager, FFmpegState
from whisperlivekit.tokens_alignment import TokensAlignment
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
SENTINEL = object() # unique sentinel object for end of stream marker
MIN_DURATION_REAL_SILENCE = 5
def cut_at(cumulative_pcm, cut_sec):
cumulative_len = 0
cut_sample = int(cut_sec * 16000)
async def get_all_from_queue(queue: asyncio.Queue) -> Union[object, Silence, np.ndarray, List[Any]]:
items: List[Any] = []
first_item = await queue.get()
queue.task_done()
if first_item is SENTINEL:
return first_item
if isinstance(first_item, Silence):
return first_item
items.append(first_item)
for ind, pcm_array in enumerate(cumulative_pcm):
if (cumulative_len + len(pcm_array)) >= cut_sample:
cut_chunk = cut_sample - cumulative_len
before = np.concatenate(cumulative_pcm[:ind] + [cumulative_pcm[ind][:cut_chunk]])
after = [cumulative_pcm[ind][cut_chunk:]] + cumulative_pcm[ind+1:]
return before, after
cumulative_len += len(pcm_array)
return np.concatenate(cumulative_pcm), []
async def get_all_from_queue(queue):
items = []
try:
while True:
item = queue.get_nowait()
items.append(item)
except asyncio.QueueEmpty:
pass
return items
while True:
if not queue._queue:
break
next_item = queue._queue[0]
if next_item is SENTINEL:
break
if isinstance(next_item, Silence):
break
items.append(await queue.get())
queue.task_done()
if isinstance(items[0], np.ndarray):
return np.concatenate(items)
else: #translation
return items
class AudioProcessor:
"""
@@ -45,7 +48,7 @@ class AudioProcessor:
Handles audio processing, state management, and result formatting.
"""
def __init__(self, **kwargs):
def __init__(self, **kwargs: Any) -> None:
"""Initialize the audio processor with configuration, models, and state."""
if 'transcription_engine' in kwargs and isinstance(kwargs['transcription_engine'], TranscriptionEngine):
@@ -64,36 +67,27 @@ class AudioProcessor:
self.is_pcm_input = self.args.pcm_input
# State management
self.is_stopping = False
self.silence = False
self.silence_duration = 0.0
self.state = State()
self.lock = asyncio.Lock()
self.sep = " " # Default separator
self.last_response_content = FrontData()
self.last_detected_speaker = None
self.speaker_languages = {}
self.diarization_before_transcription = False
self.is_stopping: bool = False
self.current_silence: Optional[Silence] = None
self.state: State = State()
self.lock: asyncio.Lock = asyncio.Lock()
self.sep: str = " " # Default separator
self.last_response_content: FrontData = FrontData()
self.segments = []
self.tokens_alignment: TokensAlignment = TokensAlignment(self.state, self.args, self.sep)
self.beg_loop: Optional[float] = None
if self.diarization_before_transcription:
self.cumulative_pcm = []
self.last_start = 0.0
self.last_end = 0.0
# Models and processing
self.asr = models.asr
self.vac_model = models.vac_model
self.asr: Any = models.asr
self.vac_model: Any = models.vac_model
if self.args.vac:
self.vac = FixedVADIterator(models.vac_model)
self.vac: Optional[FixedVADIterator] = FixedVADIterator(models.vac_model)
else:
self.vac = None
self.vac: Optional[FixedVADIterator] = None
self.ffmpeg_manager = None
self.ffmpeg_reader_task = None
self._ffmpeg_error = None
self.ffmpeg_manager: Optional[FFmpegManager] = None
self.ffmpeg_reader_task: Optional[asyncio.Task] = None
self._ffmpeg_error: Optional[str] = None
if not self.is_pcm_input:
self.ffmpeg_manager = FFmpegManager(
@@ -105,20 +99,20 @@ class AudioProcessor:
self._ffmpeg_error = error_type
self.ffmpeg_manager.on_error_callback = handle_ffmpeg_error
self.transcription_queue = asyncio.Queue() if self.args.transcription else None
self.diarization_queue = asyncio.Queue() if self.args.diarization else None
self.translation_queue = asyncio.Queue() if self.args.target_language else None
self.pcm_buffer = bytearray()
self.transcription_task = None
self.diarization_task = None
self.translation_task = None
self.watchdog_task = None
self.all_tasks_for_cleanup = []
self.transcription_queue: Optional[asyncio.Queue] = asyncio.Queue() if self.args.transcription else None
self.diarization_queue: Optional[asyncio.Queue] = asyncio.Queue() if self.args.diarization else None
self.translation_queue: Optional[asyncio.Queue] = asyncio.Queue() if self.args.target_language else None
self.pcm_buffer: bytearray = bytearray()
self.total_pcm_samples: int = 0
self.transcription_task: Optional[asyncio.Task] = None
self.diarization_task: Optional[asyncio.Task] = None
self.translation_task: Optional[asyncio.Task] = None
self.watchdog_task: Optional[asyncio.Task] = None
self.all_tasks_for_cleanup: List[asyncio.Task] = []
self.transcription = None
self.translation = None
self.diarization = None
self.transcription: Optional[Any] = None
self.translation: Optional[Any] = None
self.diarization: Optional[Any] = None
if self.args.transcription:
self.transcription = online_factory(self.args, models.asr)
@@ -128,27 +122,67 @@ class AudioProcessor:
if models.translation_model:
self.translation = online_translation_factory(self.args, models.translation_model)
def convert_pcm_to_float(self, pcm_buffer):
async def _push_silence_event(self) -> None:
if self.transcription_queue:
await self.transcription_queue.put(self.current_silence)
if self.args.diarization and self.diarization_queue:
await self.diarization_queue.put(self.current_silence)
if self.translation_queue:
await self.translation_queue.put(self.current_silence)
async def _begin_silence(self) -> None:
if self.current_silence:
return
now = time() - self.beg_loop
self.current_silence = Silence(
is_starting=True, start=now
)
await self._push_silence_event()
async def _end_silence(self) -> None:
if not self.current_silence:
return
now = time() - self.beg_loop
self.current_silence.end = now
self.current_silence.is_starting=False
self.current_silence.has_ended=True
self.current_silence.compute_duration()
if self.current_silence.duration > MIN_DURATION_REAL_SILENCE:
self.state.new_tokens.append(self.current_silence)
await self._push_silence_event()
self.current_silence = None
async def _enqueue_active_audio(self, pcm_chunk: np.ndarray) -> None:
if pcm_chunk is None or pcm_chunk.size == 0:
return
if self.transcription_queue:
await self.transcription_queue.put(pcm_chunk.copy())
if self.args.diarization and self.diarization_queue:
await self.diarization_queue.put(pcm_chunk.copy())
def _slice_before_silence(self, pcm_array: np.ndarray, chunk_sample_start: int, silence_sample: Optional[int]) -> Optional[np.ndarray]:
if silence_sample is None:
return None
relative_index = int(silence_sample - chunk_sample_start)
if relative_index <= 0:
return None
split_index = min(relative_index, len(pcm_array))
if split_index <= 0:
return None
return pcm_array[:split_index]
def convert_pcm_to_float(self, pcm_buffer: Union[bytes, bytearray]) -> np.ndarray:
"""Convert PCM buffer in s16le format to normalized NumPy array."""
return np.frombuffer(pcm_buffer, dtype=np.int16).astype(np.float32) / 32768.0
async def add_dummy_token(self):
"""Placeholder token when no transcription is available."""
async with self.lock:
current_time = time() - self.state.beg_loop
self.state.tokens.append(ASRToken(
start=current_time, end=current_time + 1,
text=".", speaker=-1, is_dummy=True
))
async def get_current_state(self):
async def get_current_state(self) -> State:
"""Get current state."""
async with self.lock:
current_time = time()
remaining_transcription = 0
if self.state.end_buffer > 0:
remaining_transcription = max(0, round(current_time - self.state.beg_loop - self.state.end_buffer, 1))
remaining_transcription = max(0, round(current_time - self.beg_loop - self.state.end_buffer, 1))
remaining_diarization = 0
if self.state.tokens:
@@ -160,7 +194,7 @@ class AudioProcessor:
return self.state
async def ffmpeg_stdout_reader(self):
async def ffmpeg_stdout_reader(self) -> None:
"""Read audio data from FFmpeg stdout and process it into the PCM pipeline."""
beg = time()
while True:
@@ -203,50 +237,60 @@ class AudioProcessor:
await asyncio.sleep(0.2)
logger.info("FFmpeg stdout processing finished. Signaling downstream processors if needed.")
if not self.diarization_before_transcription and self.transcription_queue:
if self.transcription_queue:
await self.transcription_queue.put(SENTINEL)
if self.diarization:
await self.diarization_queue.put(SENTINEL)
if self.translation:
await self.translation_queue.put(SENTINEL)
async def transcription_processor(self):
async def transcription_processor(self) -> None:
"""Process audio chunks for transcription."""
cumulative_pcm_duration_stream_time = 0.0
while True:
try:
item = await self.transcription_queue.get()
# item = await self.transcription_queue.get()
item = await get_all_from_queue(self.transcription_queue)
if item is SENTINEL:
logger.debug("Transcription processor received sentinel. Finishing.")
self.transcription_queue.task_done()
break
asr_internal_buffer_duration_s = len(getattr(self.transcription, 'audio_buffer', [])) / self.transcription.SAMPLING_RATE
transcription_lag_s = max(0.0, time() - self.state.beg_loop - self.state.end_buffer)
transcription_lag_s = max(0.0, time() - self.beg_loop - self.state.end_buffer)
asr_processing_logs = f"internal_buffer={asr_internal_buffer_duration_s:.2f}s | lag={transcription_lag_s:.2f}s |"
if type(item) is Silence:
asr_processing_logs += f" + Silence of = {item.duration:.2f}s"
stream_time_end_of_current_pcm = cumulative_pcm_duration_stream_time
new_tokens = []
current_audio_processed_upto = self.state.end_buffer
if isinstance(item, Silence):
if item.is_starting:
new_tokens, current_audio_processed_upto = await asyncio.to_thread(
self.transcription.start_silence
)
asr_processing_logs += f" + Silence starting"
if item.has_ended:
asr_processing_logs += f" + Silence of = {item.duration:.2f}s"
cumulative_pcm_duration_stream_time += item.duration
current_audio_processed_upto = cumulative_pcm_duration_stream_time
self.transcription.end_silence(item.duration, self.state.tokens[-1].end if self.state.tokens else 0)
if self.state.tokens:
asr_processing_logs += f" | last_end = {self.state.tokens[-1].end} |"
logger.info(asr_processing_logs)
cumulative_pcm_duration_stream_time += item.duration
self.transcription.insert_silence(item.duration, self.state.tokens[-1].end if self.state.tokens else 0)
continue
new_tokens = new_tokens or []
current_audio_processed_upto = max(current_audio_processed_upto, stream_time_end_of_current_pcm)
elif isinstance(item, ChangeSpeaker):
self.transcription.new_speaker(item)
continue
elif isinstance(item, np.ndarray):
pcm_array = item
logger.info(asr_processing_logs)
duration_this_chunk = len(pcm_array) / self.sample_rate
cumulative_pcm_duration_stream_time += duration_this_chunk
stream_time_end_of_current_pcm = cumulative_pcm_duration_stream_time
logger.info(asr_processing_logs)
cumulative_pcm_duration_stream_time += len(pcm_array) / self.sample_rate
stream_time_end_of_current_pcm = cumulative_pcm_duration_stream_time
self.transcription.insert_audio_chunk(pcm_array, stream_time_end_of_current_pcm)
new_tokens, current_audio_processed_upto = await asyncio.to_thread(self.transcription.process_iter)
new_tokens = new_tokens or []
self.transcription.insert_audio_chunk(pcm_array, stream_time_end_of_current_pcm)
new_tokens, current_audio_processed_upto = await asyncio.to_thread(self.transcription.process_iter)
_buffer_transcript = self.transcription.get_buffer()
buffer_text = _buffer_transcript.text
@@ -269,13 +313,12 @@ class AudioProcessor:
self.state.tokens.extend(new_tokens)
self.state.buffer_transcription = _buffer_transcript
self.state.end_buffer = max(candidate_end_times)
self.state.new_tokens.extend(new_tokens)
self.state.new_tokens_buffer = _buffer_transcript
if self.translation_queue:
for token in new_tokens:
await self.translation_queue.put(token)
self.transcription_queue.task_done()
await self.translation_queue.put(token)
except Exception as e:
logger.warning(f"Exception in transcription_processor: {e}")
logger.warning(f"Traceback: {traceback.format_exc()}")
@@ -292,124 +335,57 @@ class AudioProcessor:
logger.info("Transcription processor task finished.")
async def diarization_processor(self, diarization_obj):
"""Process audio chunks for speaker diarization."""
if self.diarization_before_transcription:
self.current_speaker = 0
await self.transcription_queue.put(ChangeSpeaker(speaker=self.current_speaker, start=0.0))
async def diarization_processor(self) -> None:
while True:
try:
item = await self.diarization_queue.get()
item = await get_all_from_queue(self.diarization_queue)
if item is SENTINEL:
logger.debug("Diarization processor received sentinel. Finishing.")
self.diarization_queue.task_done()
break
elif type(item) is Silence:
diarization_obj.insert_silence(item.duration)
if item.has_ended:
self.diarization.insert_silence(item.duration)
continue
elif isinstance(item, np.ndarray):
pcm_array = item
else:
raise Exception('item should be pcm_array')
# Process diarization
await diarization_obj.diarize(pcm_array)
if self.diarization_before_transcription:
segments = diarization_obj.get_segments()
self.cumulative_pcm.append(pcm_array)
if segments:
last_segment = segments[-1]
if last_segment.speaker != self.current_speaker:
cut_sec = last_segment.start - self.last_end
to_transcript, self.cumulative_pcm = cut_at(self.cumulative_pcm, cut_sec)
await self.transcription_queue.put(to_transcript)
self.current_speaker = last_segment.speaker
await self.transcription_queue.put(ChangeSpeaker(speaker=self.current_speaker, start=last_segment.start))
cut_sec = last_segment.end - last_segment.start
to_transcript, self.cumulative_pcm = cut_at(self.cumulative_pcm, cut_sec)
await self.transcription_queue.put(to_transcript)
self.last_start = last_segment.start
self.last_end = last_segment.end
else:
cut_sec = last_segment.end - self.last_end
to_transcript, self.cumulative_pcm = cut_at(self.cumulative_pcm, cut_sec)
await self.transcription_queue.put(to_transcript)
self.last_end = last_segment.end
elif not self.diarization_before_transcription:
async with self.lock:
self.state.tokens = diarization_obj.assign_speakers_to_tokens(
self.state.tokens,
use_punctuation_split=self.args.punctuation_split
)
if len(self.state.tokens) > 0:
self.state.end_attributed_speaker = max(self.state.tokens[-1].end, self.state.end_attributed_speaker)
self.diarization_queue.task_done()
self.diarization.insert_audio_chunk(item)
diarization_segments = await self.diarization.diarize()
self.state.new_diarization = diarization_segments
except Exception as e:
logger.warning(f"Exception in diarization_processor: {e}")
logger.warning(f"Traceback: {traceback.format_exc()}")
if 'pcm_array' in locals() and pcm_array is not SENTINEL:
self.diarization_queue.task_done()
logger.info("Diarization processor task finished.")
async def translation_processor(self):
async def translation_processor(self) -> None:
# the idea is to ignore diarization for the moment. We use only transcription tokens.
# And the speaker is attributed given the segments used for the translation
# in the future we want to have different languages for each speaker etc, so it will be more complex.
while True:
try:
item = await self.translation_queue.get() #block until at least 1 token
item = await get_all_from_queue(self.translation_queue)
if item is SENTINEL:
logger.debug("Translation processor received sentinel. Finishing.")
self.translation_queue.task_done()
break
elif type(item) is Silence:
self.translation.insert_silence(item.duration)
continue
# get all the available tokens for translation. The more words, the more precise
tokens_to_process = [item]
additional_tokens = await get_all_from_queue(self.translation_queue)
sentinel_found = False
for additional_token in additional_tokens:
if additional_token is SENTINEL:
sentinel_found = True
break
elif type(additional_token) is Silence:
self.translation.insert_silence(additional_token.duration)
if item.is_starting:
new_translation, new_translation_buffer = self.translation.validate_buffer_and_reset()
if item.has_ended:
self.translation.insert_silence(item.duration)
continue
else:
tokens_to_process.append(additional_token)
if tokens_to_process:
self.translation.insert_tokens(tokens_to_process)
translation_validated_segments, buffer_translation = await asyncio.to_thread(self.translation.process)
async with self.lock:
self.state.translation_validated_segments = translation_validated_segments
self.state.buffer_translation = buffer_translation
self.translation_queue.task_done()
for _ in additional_tokens:
self.translation_queue.task_done()
if sentinel_found:
logger.debug("Translation processor received sentinel in batch. Finishing.")
break
elif isinstance(item, ChangeSpeaker):
new_translation, new_translation_buffer = self.translation.validate_buffer_and_reset()
pass
else:
self.translation.insert_tokens(item)
new_translation, new_translation_buffer = await asyncio.to_thread(self.translation.process)
async with self.lock:
self.state.new_translation.append(new_translation)
self.state.new_translation_buffer = new_translation_buffer
except Exception as e:
logger.warning(f"Exception in translation_processor: {e}")
logger.warning(f"Traceback: {traceback.format_exc()}")
if 'token' in locals() and item is not SENTINEL:
self.translation_queue.task_done()
if 'additional_tokens' in locals():
for _ in additional_tokens:
self.translation_queue.task_done()
logger.info("Translation processor task finished.")
async def results_formatter(self):
async def results_formatter(self) -> AsyncGenerator[FrontData, None]:
"""Format processing results for output."""
while True:
try:
@@ -419,55 +395,32 @@ class AudioProcessor:
await asyncio.sleep(1)
continue
state = await self.get_current_state()
lines, undiarized_text = format_output(
state,
self.silence,
args = self.args,
sep=self.sep
self.tokens_alignment.update()
lines, buffer_diarization_text, buffer_translation_text = self.tokens_alignment.get_lines(
diarization=self.args.diarization,
translation=bool(self.translation),
current_silence=self.current_silence
)
if lines and lines[-1].speaker == -2:
buffer_transcription = Transcript()
else:
buffer_transcription = state.buffer_transcription
state = await self.get_current_state()
buffer_diarization = ''
if undiarized_text:
buffer_diarization = self.sep.join(undiarized_text)
buffer_transcription_text = state.buffer_transcription.text if state.buffer_transcription else ''
async with self.lock:
self.state.end_attributed_speaker = state.end_attributed_speaker
buffer_translation_text = ''
if state.buffer_translation:
raw_buffer_translation = getattr(state.buffer_translation, 'text', state.buffer_translation)
if raw_buffer_translation:
buffer_translation_text = raw_buffer_translation.strip()
response_status = "active_transcription"
if not state.tokens and not buffer_transcription and not buffer_diarization:
if not lines and not buffer_transcription_text and not buffer_diarization_text:
response_status = "no_audio_detected"
lines = []
elif not lines:
lines = [Line(
speaker=1,
start=state.end_buffer,
end=state.end_buffer
)]
response = FrontData(
status=response_status,
lines=lines,
buffer_transcription=buffer_transcription.text.strip(),
buffer_diarization=buffer_diarization,
buffer_transcription=buffer_transcription_text,
buffer_diarization=buffer_diarization_text,
buffer_translation=buffer_translation_text,
remaining_time_transcription=state.remaining_time_transcription,
remaining_time_diarization=state.remaining_time_diarization if self.args.diarization else 0
)
should_push = (response != self.last_response_content)
if should_push and (lines or buffer_transcription or buffer_diarization or response_status == "no_audio_detected"):
if should_push:
yield response
self.last_response_content = response
@@ -481,17 +434,17 @@ class AudioProcessor:
logger.warning(f"Exception in results_formatter. Traceback: {traceback.format_exc()}")
await asyncio.sleep(0.5)
async def create_tasks(self):
async def create_tasks(self) -> AsyncGenerator[FrontData, None]:
"""Create and start processing tasks."""
self.all_tasks_for_cleanup = []
processing_tasks_for_watchdog = []
processing_tasks_for_watchdog: List[asyncio.Task] = []
# If using FFmpeg (non-PCM input), start it and spawn stdout reader
if not self.is_pcm_input:
success = await self.ffmpeg_manager.start()
if not success:
logger.error("Failed to start FFmpeg manager")
async def error_generator():
async def error_generator() -> AsyncGenerator[FrontData, None]:
yield FrontData(
status="error",
error="FFmpeg failed to start. Please check that FFmpeg is installed."
@@ -507,7 +460,7 @@ class AudioProcessor:
processing_tasks_for_watchdog.append(self.transcription_task)
if self.diarization:
self.diarization_task = asyncio.create_task(self.diarization_processor(self.diarization))
self.diarization_task = asyncio.create_task(self.diarization_processor())
self.all_tasks_for_cleanup.append(self.diarization_task)
processing_tasks_for_watchdog.append(self.diarization_task)
@@ -522,9 +475,9 @@ class AudioProcessor:
return self.results_formatter()
async def watchdog(self, tasks_to_monitor):
async def watchdog(self, tasks_to_monitor: List[asyncio.Task]) -> None:
"""Monitors the health of critical processing tasks."""
tasks_remaining = [task for task in tasks_to_monitor if task]
tasks_remaining: List[asyncio.Task] = [task for task in tasks_to_monitor if task]
while True:
try:
if not tasks_remaining:
@@ -549,7 +502,7 @@ class AudioProcessor:
except Exception as e:
logger.error(f"Error in watchdog task: {e}", exc_info=True)
async def cleanup(self):
async def cleanup(self) -> None:
"""Clean up resources when processing is complete."""
logger.info("Starting cleanup of AudioProcessor resources.")
self.is_stopping = True
@@ -572,7 +525,7 @@ class AudioProcessor:
self.diarization.close()
logger.info("AudioProcessor cleanup complete.")
def _processing_tasks_done(self):
def _processing_tasks_done(self) -> bool:
"""Return True when all active processing tasks have completed."""
tasks_to_check = [
self.transcription_task,
@@ -583,11 +536,13 @@ class AudioProcessor:
return all(task.done() for task in tasks_to_check if task)
async def process_audio(self, message):
async def process_audio(self, message: Optional[bytes]) -> None:
"""Process incoming audio data."""
if not self.state.beg_loop:
self.state.beg_loop = time()
if not self.beg_loop:
self.beg_loop = time()
self.current_silence = Silence(start=0.0, is_starting=True)
self.tokens_alignment.beg_loop = self.beg_loop
if not message:
logger.info("Empty audio message received, initiating stop sequence.")
@@ -620,7 +575,7 @@ class AudioProcessor:
else:
logger.warning("Failed to write audio data to FFmpeg")
async def handle_pcm_data(self):
async def handle_pcm_data(self) -> None:
# Process when enough data
if len(self.pcm_buffer) < self.bytes_per_sec:
return
@@ -639,40 +594,30 @@ class AudioProcessor:
pcm_array = self.convert_pcm_to_float(self.pcm_buffer[:aligned_chunk_size])
self.pcm_buffer = self.pcm_buffer[aligned_chunk_size:]
res = None
end_of_audio = False
silence_buffer = None
num_samples = len(pcm_array)
chunk_sample_start = self.total_pcm_samples
chunk_sample_end = chunk_sample_start + num_samples
res = None
if self.args.vac:
res = self.vac(pcm_array)
if res is not None:
if res.get("end", 0) > res.get("start", 0):
end_of_audio = True
elif self.silence: #end of silence
self.silence = False
silence_buffer = Silence(duration=time() - self.start_silence)
silence_detected = res.get("end", 0) > res.get("start", 0)
if silence_detected and not self.current_silence:
pre_silence_chunk = self._slice_before_silence(
pcm_array, chunk_sample_start, res.get("end")
)
if pre_silence_chunk is not None and pre_silence_chunk.size > 0:
await self._enqueue_active_audio(pre_silence_chunk)
await self._begin_silence()
elif self.current_silence:
await self._end_silence()
if silence_buffer:
if not self.diarization_before_transcription and self.transcription_queue:
await self.transcription_queue.put(silence_buffer)
if self.args.diarization and self.diarization_queue:
await self.diarization_queue.put(silence_buffer)
if self.translation_queue:
await self.translation_queue.put(silence_buffer)
if not self.current_silence:
await self._enqueue_active_audio(pcm_array)
if not self.silence:
if not self.diarization_before_transcription and self.transcription_queue:
await self.transcription_queue.put(pcm_array.copy())
if self.args.diarization and self.diarization_queue:
await self.diarization_queue.put(pcm_array.copy())
self.silence_duration = 0.0
if end_of_audio:
self.silence = True
self.start_silence = time()
self.total_pcm_samples = chunk_sample_end
if not self.args.transcription and not self.args.diarization:
await asyncio.sleep(0.1)

View File

@@ -52,8 +52,8 @@ class TranscriptionEngine:
transcription_common_params = {
"warmup_file": None,
"min_chunk_size": 0.5,
"model_size": "tiny",
"min_chunk_size": 0.1,
"model_size": "base",
"model_cache_dir": None,
"model_dir": None,
"model_path": None,

View File

@@ -26,7 +26,7 @@ class DiarizationObserver(Observer):
"""Observer that logs all data emitted by the diarization pipeline and stores speaker segments."""
def __init__(self):
self.speaker_segments = []
self.diarization_segments = []
self.processed_time = 0
self.segment_lock = threading.Lock()
self.global_time_offset = 0.0
@@ -48,7 +48,7 @@ class DiarizationObserver(Observer):
for speaker, label in annotation._labels.items():
for start, end in zip(label.segments_boundaries_[:-1], label.segments_boundaries_[1:]):
print(f" {speaker}: {start:.2f}s-{end:.2f}s")
self.speaker_segments.append(SpeakerSegment(
self.diarization_segments.append(SpeakerSegment(
speaker=speaker,
start=start + self.global_time_offset,
end=end + self.global_time_offset
@@ -59,14 +59,14 @@ class DiarizationObserver(Observer):
def get_segments(self) -> List[SpeakerSegment]:
"""Get a copy of the current speaker segments."""
with self.segment_lock:
return self.speaker_segments.copy()
return self.diarization_segments.copy()
def clear_old_segments(self, older_than: float = 30.0):
"""Clear segments older than the specified time."""
with self.segment_lock:
current_time = self.processed_time
self.speaker_segments = [
segment for segment in self.speaker_segments
self.diarization_segments = [
segment for segment in self.diarization_segments
if current_time - segment.end < older_than
]
@@ -178,7 +178,6 @@ class DiartDiarization:
self.pipeline = SpeakerDiarization(config=config)
self.observer = DiarizationObserver()
self.lag_diart = None
if use_microphone:
self.source = MicrophoneAudioSource(block_duration=block_duration)
@@ -217,32 +216,6 @@ class DiartDiarization:
if self.custom_source:
self.custom_source.close()
def assign_speakers_to_tokens(self, tokens: list, use_punctuation_split: bool = False) -> float:
"""
Assign speakers to tokens based on timing overlap with speaker segments.
Uses the segments collected by the observer.
If use_punctuation_split is True, uses punctuation marks to refine speaker boundaries.
"""
segments = self.observer.get_segments()
# Debug logging
logger.debug(f"assign_speakers_to_tokens called with {len(tokens)} tokens")
logger.debug(f"Available segments: {len(segments)}")
for i, seg in enumerate(segments[:5]): # Show first 5 segments
logger.debug(f" Segment {i}: {seg.speaker} [{seg.start:.2f}-{seg.end:.2f}]")
if not self.lag_diart and segments and tokens:
self.lag_diart = segments[0].start - tokens[0].start
if not use_punctuation_split:
for token in tokens:
for segment in segments:
if not (segment.end <= token.start + self.lag_diart or segment.start >= token.end + self.lag_diart):
token.speaker = extract_number(segment.speaker) + 1
else:
tokens = add_speaker_to_tokens(segments, tokens)
return tokens
def concatenate_speakers(segments):
segments_concatenated = [{"speaker": 1, "begin": 0.0, "end": 0.0}]

View File

@@ -94,11 +94,11 @@ class SortformerDiarizationOnline:
model_name: Pre-trained model name (default: "nvidia/diar_streaming_sortformer_4spk-v2")
"""
self.sample_rate = sample_rate
self.speaker_segments = []
self.diarization_segments = []
self.diar_segments = []
self.buffer_audio = np.array([], dtype=np.float32)
self.segment_lock = threading.Lock()
self.global_time_offset = 0.0
self.processed_time = 0.0
self.debug = False
self.diar_model = shared_model.diar_model
@@ -155,12 +155,10 @@ class SortformerDiarizationOnline:
)
self.streaming_state.fifo_lengths = torch.zeros((batch_size,), dtype=torch.long, device=device)
self.streaming_state.mean_sil_emb = torch.zeros((batch_size, self.diar_model.sortformer_modules.fc_d_model), device=device)
self.streaming_state.n_sil_frames = torch.zeros((batch_size,), dtype=torch.long, device=device)
# Initialize total predictions tensor
self.streaming_state.n_sil_frames = torch.zeros((batch_size,), dtype=torch.long, device=device)
self.total_preds = torch.zeros((batch_size, 0, self.diar_model.sortformer_modules.n_spk), device=device)
def insert_silence(self, silence_duration: float):
def insert_silence(self, silence_duration: Optional[float]):
"""
Insert silence period by adjusting the global time offset.
@@ -171,248 +169,111 @@ class SortformerDiarizationOnline:
self.global_time_offset += silence_duration
logger.debug(f"Inserted silence of {silence_duration:.2f}s, new offset: {self.global_time_offset:.2f}s")
async def diarize(self, pcm_array: np.ndarray):
def insert_audio_chunk(self, pcm_array: np.ndarray):
if self.debug:
self.audio_buffer.append(pcm_array.copy())
self.buffer_audio = np.concatenate([self.buffer_audio, pcm_array.copy()])
async def diarize(self):
"""
Process audio data for diarization in streaming fashion.
Args:
pcm_array: Audio data as numpy array
"""
try:
if self.debug:
self.audio_buffer.append(pcm_array.copy())
threshold = int(self.chunk_duration_seconds * self.sample_rate)
threshold = int(self.chunk_duration_seconds * self.sample_rate)
if not len(self.buffer_audio) >= threshold:
return []
audio = self.buffer_audio[:threshold]
self.buffer_audio = self.buffer_audio[threshold:]
device = self.diar_model.device
audio_signal_chunk = torch.tensor(audio, device=device).unsqueeze(0)
audio_signal_length_chunk = torch.tensor([audio_signal_chunk.shape[1]], device=device)
processed_signal_chunk, processed_signal_length_chunk = self.audio2mel.get_features(
audio_signal_chunk, audio_signal_length_chunk
)
processed_signal_chunk = processed_signal_chunk.to(device)
processed_signal_length_chunk = processed_signal_length_chunk.to(device)
if self._previous_chunk_features is not None:
to_add = self._previous_chunk_features[:, :, -99:].to(device)
total_features = torch.concat([to_add, processed_signal_chunk], dim=2).to(device)
else:
total_features = processed_signal_chunk.to(device)
self._previous_chunk_features = processed_signal_chunk.to(device)
chunk_feat_seq_t = torch.transpose(total_features, 1, 2).to(device)
with torch.inference_mode():
left_offset = 8 if self._chunk_index > 0 else 0
right_offset = 8
self.buffer_audio = np.concatenate([self.buffer_audio, pcm_array.copy()])
if not len(self.buffer_audio) >= threshold:
return
audio = self.buffer_audio[:threshold]
self.buffer_audio = self.buffer_audio[threshold:]
device = self.diar_model.device
audio_signal_chunk = torch.tensor(audio, device=device).unsqueeze(0)
audio_signal_length_chunk = torch.tensor([audio_signal_chunk.shape[1]], device=device)
processed_signal_chunk, processed_signal_length_chunk = self.audio2mel.get_features(
audio_signal_chunk, audio_signal_length_chunk
)
processed_signal_chunk = processed_signal_chunk.to(device)
processed_signal_length_chunk = processed_signal_length_chunk.to(device)
if self._previous_chunk_features is not None:
to_add = self._previous_chunk_features[:, :, -99:].to(device)
total_features = torch.concat([to_add, processed_signal_chunk], dim=2).to(device)
else:
total_features = processed_signal_chunk.to(device)
self._previous_chunk_features = processed_signal_chunk.to(device)
chunk_feat_seq_t = torch.transpose(total_features, 1, 2).to(device)
with torch.inference_mode():
left_offset = 8 if self._chunk_index > 0 else 0
right_offset = 8
self.streaming_state, self.total_preds = self.diar_model.forward_streaming_step(
processed_signal=chunk_feat_seq_t,
processed_signal_length=torch.tensor([chunk_feat_seq_t.shape[1]]).to(device),
streaming_state=self.streaming_state,
total_preds=self.total_preds,
left_offset=left_offset,
right_offset=right_offset,
)
# Convert predictions to speaker segments
self._process_predictions()
self._chunk_index += 1
except Exception as e:
logger.error(f"Error in diarize: {e}")
raise
# TODO: Handle case when stream ends with partial buffer (accumulated_duration > 0 but < chunk_duration_seconds)
self.streaming_state, self.total_preds = self.diar_model.forward_streaming_step(
processed_signal=chunk_feat_seq_t,
processed_signal_length=torch.tensor([chunk_feat_seq_t.shape[1]]).to(device),
streaming_state=self.streaming_state,
total_preds=self.total_preds,
left_offset=left_offset,
right_offset=right_offset,
)
new_segments = self._process_predictions()
self._chunk_index += 1
return new_segments
def _process_predictions(self):
"""Process model predictions and convert to speaker segments."""
try:
preds_np = self.total_preds[0].cpu().numpy()
active_speakers = np.argmax(preds_np, axis=1)
if self._len_prediction is None:
self._len_prediction = len(active_speakers)
# Get predictions for current chunk
frame_duration = self.chunk_duration_seconds / self._len_prediction
current_chunk_preds = active_speakers[-self._len_prediction:]
with self.segment_lock:
# Process predictions into segments
base_time = self._chunk_index * self.chunk_duration_seconds + self.global_time_offset
for idx, spk in enumerate(current_chunk_preds):
start_time = base_time + idx * frame_duration
end_time = base_time + (idx + 1) * frame_duration
# Check if this continues the last segment or starts a new one
if (self.speaker_segments and
self.speaker_segments[-1].speaker == spk and
abs(self.speaker_segments[-1].end - start_time) < frame_duration * 0.5):
# Continue existing segment
self.speaker_segments[-1].end = end_time
else:
# Create new segment
self.speaker_segments.append(SpeakerSegment(
speaker=spk,
start=start_time,
end=end_time
))
# Update processed time
self.processed_time = max(self.processed_time, base_time + self.chunk_duration_seconds)
logger.debug(f"Processed chunk {self._chunk_index}, total segments: {len(self.speaker_segments)}")
except Exception as e:
logger.error(f"Error processing predictions: {e}")
def assign_speakers_to_tokens(self, tokens: list, use_punctuation_split: bool = False) -> list:
"""
Assign speakers to tokens based on timing overlap with speaker segments.
preds_np = self.total_preds[0].cpu().numpy()
active_speakers = np.argmax(preds_np, axis=1)
Args:
tokens: List of tokens with timing information
use_punctuation_split: Whether to use punctuation for boundary refinement
Returns:
List of tokens with speaker assignments
Last speaker_segment
"""
if self._len_prediction is None:
self._len_prediction = len(active_speakers) #12
frame_duration = self.chunk_duration_seconds / self._len_prediction
current_chunk_preds = active_speakers[-self._len_prediction:]
new_segments = []
with self.segment_lock:
segments = self.speaker_segments.copy()
if not segments or not tokens:
logger.debug("No segments or tokens available for speaker assignment")
return tokens
logger.debug(f"Assigning speakers to {len(tokens)} tokens using {len(segments)} segments")
use_punctuation_split = False
if not use_punctuation_split:
# Simple overlap-based assignment
for token in tokens:
token.speaker = -1 # Default to no speaker
for segment in segments:
# Check for timing overlap
if not (segment.end <= token.start or segment.start >= token.end):
token.speaker = segment.speaker + 1 # Convert to 1-based indexing
break
else:
# Use punctuation-aware assignment (similar to diart_backend)
tokens = self._add_speaker_to_tokens_with_punctuation(segments, tokens)
return tokens
def _add_speaker_to_tokens_with_punctuation(self, segments: List[SpeakerSegment], tokens: list) -> list:
"""
Assign speakers to tokens with punctuation-aware boundary adjustment.
Args:
segments: List of speaker segments
tokens: List of tokens to assign speakers to
Returns:
List of tokens with speaker assignments
"""
punctuation_marks = {'.', '!', '?'}
punctuation_tokens = [token for token in tokens if token.text.strip() in punctuation_marks]
# Convert segments to concatenated format
segments_concatenated = self._concatenate_speakers(segments)
# Adjust segment boundaries based on punctuation
for ind, segment in enumerate(segments_concatenated):
for i, punctuation_token in enumerate(punctuation_tokens):
if punctuation_token.start > segment['end']:
after_length = punctuation_token.start - segment['end']
before_length = segment['end'] - punctuation_tokens[i - 1].end if i > 0 else float('inf')
if before_length > after_length:
segment['end'] = punctuation_token.start
if i < len(punctuation_tokens) - 1 and ind + 1 < len(segments_concatenated):
segments_concatenated[ind + 1]['begin'] = punctuation_token.start
else:
segment['end'] = punctuation_tokens[i - 1].end if i > 0 else segment['end']
if i < len(punctuation_tokens) - 1 and ind - 1 >= 0:
segments_concatenated[ind - 1]['begin'] = punctuation_tokens[i - 1].end
break
# Ensure non-overlapping tokens
last_end = 0.0
for token in tokens:
start = max(last_end + 0.01, token.start)
token.start = start
token.end = max(start, token.end)
last_end = token.end
# Assign speakers based on adjusted segments
ind_last_speaker = 0
for segment in segments_concatenated:
for i, token in enumerate(tokens[ind_last_speaker:]):
if token.end <= segment['end']:
token.speaker = segment['speaker']
ind_last_speaker = i + 1
elif token.start > segment['end']:
break
return tokens
def _concatenate_speakers(self, segments: List[SpeakerSegment]) -> List[dict]:
"""
Concatenate consecutive segments from the same speaker.
Args:
segments: List of speaker segments
Returns:
List of concatenated speaker segments
"""
if not segments:
return []
segments_concatenated = [{"speaker": segments[0].speaker + 1, "begin": segments[0].start, "end": segments[0].end}]
for segment in segments[1:]:
speaker = segment.speaker + 1
if segments_concatenated[-1]['speaker'] != speaker:
segments_concatenated.append({"speaker": speaker, "begin": segment.start, "end": segment.end})
else:
segments_concatenated[-1]['end'] = segment.end
base_time = self._chunk_index * self.chunk_duration_seconds + self.global_time_offset
current_spk = current_chunk_preds[0]
start_time = round(base_time, 2)
for idx, spk in enumerate(current_chunk_preds):
current_time = round(base_time + idx * frame_duration, 2)
if spk != current_spk:
new_segments.append(SpeakerSegment(
speaker=current_spk,
start=start_time,
end=current_time
))
start_time = current_time
current_spk = spk
new_segments.append(
SpeakerSegment(
speaker=current_spk,
start=start_time,
end=current_time
)
)
return new_segments
return segments_concatenated
def get_segments(self) -> List[SpeakerSegment]:
"""Get a copy of the current speaker segments."""
with self.segment_lock:
return self.speaker_segments.copy()
def clear_old_segments(self, older_than: float = 30.0):
"""Clear segments older than the specified time."""
with self.segment_lock:
current_time = self.processed_time
self.speaker_segments = [
segment for segment in self.speaker_segments
if current_time - segment.end < older_than
]
logger.debug(f"Cleared old segments, remaining: {len(self.speaker_segments)}")
return self.diarization_segments.copy()
def close(self):
"""Close the diarization system and clean up resources."""
logger.info("Closing SortformerDiarization")
with self.segment_lock:
self.speaker_segments.clear()
self.diarization_segments.clear()
if self.debug:
concatenated_audio = np.concatenate(self.audio_buffer)
@@ -438,7 +299,7 @@ if __name__ == '__main__':
async def main():
"""TEST ONLY."""
an4_audio = 'audio_test.mp3'
an4_audio = 'diarization_audio.wav'
signal, sr = librosa.load(an4_audio, sr=16000)
signal = signal[:16000*30]
@@ -450,13 +311,15 @@ if __name__ == '__main__':
print("Speaker 0: 0:25 - 0:30")
print("=" * 50)
diarization = SortformerDiarization(sample_rate=16000)
diarization_backend = SortformerDiarization()
diarization = SortformerDiarizationOnline(shared_model = diarization_backend)
chunk_size = 1600
for i in range(0, len(signal), chunk_size):
chunk = signal[i:i+chunk_size]
await diarization.diarize(chunk)
new_segments = await diarization.diarize(chunk)
print(f"Processed chunk {i // chunk_size + 1}")
print(new_segments)
segments = diarization.get_segments()
print("\nDiarization results:")

View File

@@ -1,205 +0,0 @@
import numpy as np
import torch
import logging
from nemo.collections.asr.models import SortformerEncLabelModel
from nemo.collections.asr.modules import AudioToMelSpectrogramPreprocessor
import librosa
logger = logging.getLogger(__name__)
def load_model():
diar_model = SortformerEncLabelModel.from_pretrained("nvidia/diar_streaming_sortformer_4spk-v2")
diar_model.eval()
if torch.cuda.is_available():
diar_model.to(torch.device("cuda"))
#we target 1 second lag for the moment. chunk_len could be reduced.
diar_model.sortformer_modules.chunk_len = 10
diar_model.sortformer_modules.subsampling_factor = 10 #8 would be better ideally
diar_model.sortformer_modules.chunk_right_context = 0 #no.
diar_model.sortformer_modules.chunk_left_context = 10 #big so it compensiate the problem with no padding later.
diar_model.sortformer_modules.spkcache_len = 188
diar_model.sortformer_modules.fifo_len = 188
diar_model.sortformer_modules.spkcache_update_period = 144
diar_model.sortformer_modules.log = False
diar_model.sortformer_modules._check_streaming_parameters()
audio2mel = AudioToMelSpectrogramPreprocessor(
window_size= 0.025,
normalize="NA",
n_fft=512,
features=128,
pad_to=0) #pad_to 16 works better than 0. On test audio, we detect a third speaker for 1 second with pad_to=0. To solve that : increase left context to 10.
return diar_model, audio2mel
diar_model, audio2mel = load_model()
class StreamingSortformerState:
"""
This class creates a class instance that will be used to store the state of the
streaming Sortformer model.
Attributes:
spkcache (torch.Tensor): Speaker cache to store embeddings from start
spkcache_lengths (torch.Tensor): Lengths of the speaker cache
spkcache_preds (torch.Tensor): The speaker predictions for the speaker cache parts
fifo (torch.Tensor): FIFO queue to save the embedding from the latest chunks
fifo_lengths (torch.Tensor): Lengths of the FIFO queue
fifo_preds (torch.Tensor): The speaker predictions for the FIFO queue parts
spk_perm (torch.Tensor): Speaker permutation information for the speaker cache
mean_sil_emb (torch.Tensor): Mean silence embedding
n_sil_frames (torch.Tensor): Number of silence frames
"""
spkcache = None # Speaker cache to store embeddings from start
spkcache_lengths = None #
spkcache_preds = None # speaker cache predictions
fifo = None # to save the embedding from the latest chunks
fifo_lengths = None
fifo_preds = None
spk_perm = None
mean_sil_emb = None
n_sil_frames = None
def init_streaming_state(self, batch_size: int = 1, async_streaming: bool = False, device: torch.device = None):
"""
Initializes StreamingSortformerState with empty tensors or zero-valued tensors.
Args:
batch_size (int): Batch size for tensors in streaming state
async_streaming (bool): True for asynchronous update, False for synchronous update
device (torch.device): Device for tensors in streaming state
Returns:
streaming_state (SortformerStreamingState): initialized streaming state
"""
streaming_state = StreamingSortformerState()
if async_streaming:
streaming_state.spkcache = torch.zeros((batch_size, self.spkcache_len, self.fc_d_model), device=device)
streaming_state.spkcache_preds = torch.zeros((batch_size, self.spkcache_len, self.n_spk), device=device)
streaming_state.spkcache_lengths = torch.zeros((batch_size,), dtype=torch.long, device=device)
streaming_state.fifo = torch.zeros((batch_size, self.fifo_len, self.fc_d_model), device=device)
streaming_state.fifo_lengths = torch.zeros((batch_size,), dtype=torch.long, device=device)
else:
streaming_state.spkcache = torch.zeros((batch_size, 0, self.fc_d_model), device=device)
streaming_state.fifo = torch.zeros((batch_size, 0, self.fc_d_model), device=device)
streaming_state.mean_sil_emb = torch.zeros((batch_size, self.fc_d_model), device=device)
streaming_state.n_sil_frames = torch.zeros((batch_size,), dtype=torch.long, device=device)
return streaming_state
def process_diarization(chunks):
"""
what it does:
1. Preprocessing: Applies dithering and pre-emphasis (high-pass filter) if enabled
2. STFT: Computes the Short-Time Fourier Transform using:
- the window of window_size=0.025 --> size of a window : 400 samples
- the hop parameter : n_window_stride = 0.01 -> every 160 samples, a new window
3. Magnitude Calculation: Converts complex STFT output to magnitude spectrogram
4. Mel Conversion: Applies Mel filterbanks (128 filters in this case) to get Mel spectrogram
5. Logarithm: Takes the log of the Mel spectrogram (if `log=True`)
6. Normalization: Skips normalization since `normalize="NA"`
7. Padding: Pads the time dimension to a multiple of `pad_to` (default 16)
"""
previous_chunk = None
l_chunk_feat_seq_t = []
for chunk in chunks:
audio_signal_chunk = torch.tensor(chunk).unsqueeze(0).to(diar_model.device)
audio_signal_length_chunk = torch.tensor([audio_signal_chunk.shape[1]]).to(diar_model.device)
processed_signal_chunk, processed_signal_length_chunk = audio2mel.get_features(audio_signal_chunk, audio_signal_length_chunk)
if previous_chunk is not None:
to_add = previous_chunk[:, :, -99:]
total = torch.concat([to_add, processed_signal_chunk], dim=2)
else:
total = processed_signal_chunk
previous_chunk = processed_signal_chunk
l_chunk_feat_seq_t.append(torch.transpose(total, 1, 2))
batch_size = 1
streaming_state = init_streaming_state(diar_model.sortformer_modules,
batch_size = batch_size,
async_streaming = True,
device = diar_model.device
)
total_preds = torch.zeros((batch_size, 0, diar_model.sortformer_modules.n_spk), device=diar_model.device)
chunk_duration_seconds = diar_model.sortformer_modules.chunk_len * diar_model.sortformer_modules.subsampling_factor * diar_model.preprocessor._cfg.window_stride
l_speakers = [
{'start_time': 0,
'end_time': 0,
'speaker': 0
}
]
len_prediction = None
left_offset = 0
right_offset = 8
for i, chunk_feat_seq_t in enumerate(l_chunk_feat_seq_t):
with torch.inference_mode():
streaming_state, total_preds = diar_model.forward_streaming_step(
processed_signal=chunk_feat_seq_t,
processed_signal_length=torch.tensor([chunk_feat_seq_t.shape[1]]),
streaming_state=streaming_state,
total_preds=total_preds,
left_offset=left_offset,
right_offset=right_offset,
)
left_offset = 8
preds_np = total_preds[0].cpu().numpy()
active_speakers = np.argmax(preds_np, axis=1)
if len_prediction is None:
len_prediction = len(active_speakers) # we want to get the len of 1 prediction
frame_duration = chunk_duration_seconds / len_prediction
active_speakers = active_speakers[-len_prediction:]
for idx, spk in enumerate(active_speakers):
if spk != l_speakers[-1]['speaker']:
l_speakers.append(
{'start_time': (i * chunk_duration_seconds + idx * frame_duration),
'end_time': (i * chunk_duration_seconds + (idx + 1) * frame_duration),
'speaker': spk
})
else:
l_speakers[-1]['end_time'] = i * chunk_duration_seconds + (idx + 1) * frame_duration
"""
Should print
[{'start_time': 0, 'end_time': 8.72, 'speaker': 0},
{'start_time': 8.72, 'end_time': 18.88, 'speaker': 1},
{'start_time': 18.88, 'end_time': 24.96, 'speaker': 2},
{'start_time': 24.96, 'end_time': 31.68, 'speaker': 0}]
"""
for speaker in l_speakers:
print(f"Speaker {speaker['speaker']}: {speaker['start_time']:.2f}s - {speaker['end_time']:.2f}s")
if __name__ == '__main__':
an4_audio = 'audio_test.mp3'
signal, sr = librosa.load(an4_audio, sr=16000)
signal = signal[:16000*30]
# signal = signal[:-(len(signal)%16000)]
print("\n" + "=" * 50)
print("Expected ground truth:")
print("Speaker 0: 0:00 - 0:09")
print("Speaker 1: 0:09 - 0:19")
print("Speaker 2: 0:19 - 0:25")
print("Speaker 0: 0:25 - 0:30")
print("=" * 50)
chunk_size = 16000 # 1 second
chunks = []
for i in range(0, len(signal), chunk_size):
chunk = signal[i:i+chunk_size]
chunks.append(chunk)
process_diarization(chunks)

View File

@@ -224,7 +224,8 @@ class MLXWhisper(ASRBase):
if segment.get("no_speech_prob", 0) > 0.9:
continue
for word in segment.get("words", []):
token = ASRToken(word["start"], word["end"], word["word"], probability=word["probability"])
probability=word["probability"]
token = ASRToken(word["start"], word["end"], word["word"])
tokens.append(token)
return tokens

View File

@@ -151,21 +151,32 @@ class OnlineASRProcessor:
"""Append an audio chunk (a numpy array) to the current audio buffer."""
self.audio_buffer = np.append(self.audio_buffer, audio)
def insert_silence(self, silence_duration, offset):
"""
If silences are > 5s, we do a complete context clear. Otherwise, we just insert a small silence and shift the last_attend_frame
"""
# if self.transcript_buffer.buffer:
# self.committed.extend(self.transcript_buffer.buffer)
# self.transcript_buffer.buffer = []
if True: #silence_duration < 3: #we want the last audio to be treated to not have a gap. could also be handled in the future in ends_with_silence.
gap_silence = np.zeros(int(16000 * silence_duration), dtype=np.int16)
self.insert_audio_chunk(gap_silence)
def start_silence(self):
if self.audio_buffer.size == 0:
return [], self.get_audio_buffer_end_time()
return self.process_iter()
def end_silence(self, silence_duration: Optional[float], offset: float):
if not silence_duration or silence_duration <= 0:
return
long_silence = silence_duration >= 5
if not long_silence:
gap_samples = int(self.SAMPLING_RATE * silence_duration)
if gap_samples > 0:
gap_silence = np.zeros(gap_samples, dtype=np.float32)
self.insert_audio_chunk(gap_silence)
else:
self.init(offset=silence_duration + offset)
self.global_time_offset += silence_duration
def insert_silence(self, silence_duration, offset):
"""
Backwards compatibility shim for legacy callers that still use insert_silence.
"""
self.end_silence(silence_duration, offset)
def prompt(self) -> Tuple[str, str]:
"""
Returns a tuple: (prompt, context), where:
@@ -400,11 +411,11 @@ class OnlineASRProcessor:
) -> Transcript:
sep = sep if sep is not None else self.asr.sep
text = sep.join(token.text for token in tokens)
probability = sum(token.probability for token in tokens if token.probability) / len(tokens) if tokens else None
# probability = sum(token.probability for token in tokens if token.probability) / len(tokens) if tokens else None
if tokens:
start = offset + tokens[0].start
end = offset + tokens[-1].end
else:
start = None
end = None
return Transcript(start, end, text, probability=probability)
return Transcript(start, end, text)

View File

@@ -81,14 +81,14 @@ def parse_args():
parser.add_argument(
"--min-chunk-size",
type=float,
default=0.5,
default=0.1,
help="Minimum audio chunk size in seconds. It waits up to this time to do processing. If the processing takes shorter time, it waits, otherwise it processes the whole segment that was received by this time.",
)
parser.add_argument(
"--model",
type=str,
default="small",
default="base",
dest='model_size',
help="Name size of the Whisper model to use (default: tiny). Suggested values: tiny.en,tiny,base.en,base,small.en,small,medium.en,medium,large-v1,large-v2,large-v3,large,large-v3-turbo. The model is automatically downloaded from the model hub if not present in model cache dir.",
)

View File

@@ -1,106 +0,0 @@
from whisperlivekit.timed_objects import ASRToken
from time import time
import re
MIN_SILENCE_DURATION = 4 #in seconds
END_SILENCE_DURATION = 8 #in seconds. you should keep it important to not have false positive when the model lag is important
END_SILENCE_DURATION_VAC = 3 #VAC is good at detecting silences, but we want to skip the smallest silences
def blank_to_silence(tokens):
full_string = ''.join([t.text for t in tokens])
patterns = [re.compile(r'(?:\s*\[BLANK_AUDIO\]\s*)+'), re.compile(r'(?:\s*\[typing\]\s*)+')]
matches = []
for pattern in patterns:
for m in pattern.finditer(full_string):
matches.append({
'start': m.start(),
'end': m.end()
})
if matches:
# cleaned = pattern.sub(' ', full_string).strip()
# print("Cleaned:", cleaned)
cumulated_len = 0
silence_token = None
cleaned_tokens = []
for token in tokens:
if matches:
start = cumulated_len
end = cumulated_len + len(token.text)
cumulated_len = end
if start >= matches[0]['start'] and end <= matches[0]['end']:
if silence_token: #previous token was already silence
silence_token.start = min(silence_token.start, token.start)
silence_token.end = max(silence_token.end, token.end)
else: #new silence
silence_token = ASRToken(
start=token.start,
end=token.end,
speaker=-2,
probability=0.95
)
else:
if silence_token: #there was silence but no more
if silence_token.duration() >= MIN_SILENCE_DURATION:
cleaned_tokens.append(
silence_token
)
silence_token = None
matches.pop(0)
cleaned_tokens.append(token)
# print(cleaned_tokens)
return cleaned_tokens
return tokens
def no_token_to_silence(tokens):
new_tokens = []
silence_token = None
for token in tokens:
if token.speaker == -2:
if new_tokens and new_tokens[-1].speaker == -2: #if token is silence and previous one too
new_tokens[-1].end = token.end
else:
new_tokens.append(token)
last_end = new_tokens[-1].end if new_tokens else 0.0
if token.start - last_end >= MIN_SILENCE_DURATION: #if token is not silence but important gap
if new_tokens and new_tokens[-1].speaker == -2:
new_tokens[-1].end = token.start
else:
silence_token = ASRToken(
start=last_end,
end=token.start,
speaker=-2,
probability=0.95
)
new_tokens.append(silence_token)
if token.speaker != -2:
new_tokens.append(token)
return new_tokens
def ends_with_silence(tokens, beg_loop, vac_detected_silence):
current_time = time() - (beg_loop if beg_loop else 0.0)
last_token = tokens[-1]
if vac_detected_silence or (current_time - last_token.end >= END_SILENCE_DURATION):
if last_token.speaker == -2:
last_token.end = current_time
else:
tokens.append(
ASRToken(
start=tokens[-1].end,
end=current_time,
speaker=-2,
probability=0.95
)
)
return tokens
def handle_silences(tokens, beg_loop, vac_detected_silence):
if not tokens:
return []
tokens = blank_to_silence(tokens) #useful for simulstreaming backend which tends to generate [BLANK_AUDIO] text
tokens = no_token_to_silence(tokens)
tokens = ends_with_silence(tokens, beg_loop, vac_detected_silence)
return tokens

View File

@@ -1,153 +0,0 @@
import logging
from whisperlivekit.remove_silences import handle_silences
from whisperlivekit.timed_objects import Line, format_time
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
CHECK_AROUND = 4
DEBUG = False
def is_punctuation(token):
if token.is_punctuation():
return True
return False
def next_punctuation_change(i, tokens):
for ind in range(i+1, min(len(tokens), i+CHECK_AROUND+1)):
if is_punctuation(tokens[ind]):
return ind
return None
def next_speaker_change(i, tokens, speaker):
for ind in range(i-1, max(0, i-CHECK_AROUND)-1, -1):
token = tokens[ind]
if is_punctuation(token):
break
if token.speaker != speaker:
return ind, token.speaker
return None, speaker
def new_line(
token,
):
return Line(
speaker = token.corrected_speaker,
text = token.text + (f"[{format_time(token.start)} : {format_time(token.end)}]" if DEBUG else ""),
start = token.start,
end = token.end,
detected_language=token.detected_language
)
def append_token_to_last_line(lines, sep, token):
if not lines:
lines.append(new_line(token))
else:
if token.text:
lines[-1].text += sep + token.text + (f"[{format_time(token.start)} : {format_time(token.end)}]" if DEBUG else "")
lines[-1].end = token.end
if not lines[-1].detected_language and token.detected_language:
lines[-1].detected_language = token.detected_language
def format_output(state, silence, args, sep):
diarization = args.diarization
disable_punctuation_split = args.disable_punctuation_split
tokens = state.tokens
translation_validated_segments = state.translation_validated_segments # Here we will attribute the speakers only based on the timestamps of the segments
last_validated_token = state.last_validated_token
previous_speaker = 1
undiarized_text = []
tokens = handle_silences(tokens, state.beg_loop, silence)
for i in range(last_validated_token, len(tokens)):
token = tokens[i]
speaker = int(token.speaker)
token.corrected_speaker = speaker
if not diarization:
if speaker == -1: #Speaker -1 means no attributed by diarization. In the frontend, it should appear under 'Speaker 1'
token.corrected_speaker = 1
token.validated_speaker = True
else:
if is_punctuation(token):
state.last_punctuation_index = i
if state.last_punctuation_index == i-1:
if token.speaker != previous_speaker:
token.validated_speaker = True
# perfect, diarization perfectly aligned
last_punctuation = None
else:
speaker_change_pos, new_speaker = next_speaker_change(i, tokens, speaker)
if speaker_change_pos:
# Corrects delay:
# That was the idea. <Okay> haha |SPLIT SPEAKER| that's a good one
# should become:
# That was the idea. |SPLIT SPEAKER| <Okay> haha that's a good one
token.corrected_speaker = new_speaker
token.validated_speaker = True
elif speaker != previous_speaker:
if not (speaker == -2 or previous_speaker == -2):
if next_punctuation_change(i, tokens):
# Corrects advance:
# Are you |SPLIT SPEAKER| <okay>? yeah, sure. Absolutely
# should become:
# Are you <okay>? |SPLIT SPEAKER| yeah, sure. Absolutely
token.corrected_speaker = previous_speaker
token.validated_speaker = True
else: #Problematic, except if the language has no punctuation. We append to previous line, except if disable_punctuation_split is set to True.
if not disable_punctuation_split:
token.corrected_speaker = previous_speaker
token.validated_speaker = False
if token.validated_speaker:
state.last_validated_token = i
previous_speaker = token.corrected_speaker
previous_speaker = 1
lines = []
for token in tokens:
if int(token.corrected_speaker) != int(previous_speaker):
lines.append(new_line(token))
else:
append_token_to_last_line(lines, sep, token)
previous_speaker = token.corrected_speaker
if lines:
unassigned_translated_segments = []
for ts in translation_validated_segments:
assigned = False
for line in lines:
if ts and ts.overlaps_with(line):
if ts.is_within(line):
line.translation += ts.text + ' '
assigned = True
break
else:
ts0, ts1 = ts.approximate_cut_at(line.end)
if ts0 and line.overlaps_with(ts0):
line.translation += ts0.text + ' '
if ts1:
unassigned_translated_segments.append(ts1)
assigned = True
break
if not assigned:
unassigned_translated_segments.append(ts)
if unassigned_translated_segments:
for line in lines:
remaining_segments = []
for ts in unassigned_translated_segments:
if ts and ts.overlaps_with(line):
line.translation += ts.text + ' '
else:
remaining_segments.append(ts)
unassigned_translated_segments = remaining_segments #maybe do smth in the future about that
if state.buffer_transcription and lines:
lines[-1].end = max(state.buffer_transcription.end, lines[-1].end)
return lines, undiarized_text

View File

@@ -18,7 +18,7 @@ from whisperlivekit.backend_support import (
import torch
from whisperlivekit.simul_whisper.config import AlignAttConfig
from whisperlivekit.simul_whisper.simul_whisper import PaddedAlignAttWhisper
from whisperlivekit.simul_whisper.simul_whisper import AlignAtt
logger = logging.getLogger(__name__)
@@ -34,6 +34,8 @@ if HAS_FASTER_WHISPER:
else:
WhisperModel = None
MIN_DURATION_REAL_SILENCE = 5
class SimulStreamingOnlineProcessor:
SAMPLING_RATE = 16000
@@ -56,23 +58,29 @@ class SimulStreamingOnlineProcessor:
def load_new_backend(self):
model = self.asr.get_new_model_instance()
self.model = PaddedAlignAttWhisper(
self.model = AlignAtt(
cfg=self.asr.cfg,
loaded_model=model,
mlx_encoder=self.asr.mlx_encoder,
fw_encoder=self.asr.fw_encoder,
)
def insert_silence(self, silence_duration, offset):
def start_silence(self):
tokens, processed_upto = self.process_iter(is_last=True)
return tokens, processed_upto
def end_silence(self, silence_duration, offset):
"""
If silences are > 5s, we do a complete context clear. Otherwise, we just insert a small silence and shift the last_attend_frame
If silences are > MIN_DURATION_REAL_SILENCE, we do a complete context clear. Otherwise, we just insert a small silence and shift the last_attend_frame
"""
if silence_duration < 5:
gap_silence = torch.zeros(int(16000*silence_duration))
self.model.insert_audio(gap_silence)
# self.global_time_offset += silence_duration
else:
self.process_iter(is_last=True) #we want to totally process what remains in the buffer.
self.end += silence_duration
long_silence = silence_duration >= MIN_DURATION_REAL_SILENCE
if not long_silence:
gap_len = int(16000 * silence_duration)
if gap_len > 0:
gap_silence = torch.zeros(gap_len)
self.model.insert_audio(gap_silence)
if long_silence:
self.model.refresh_segment(complete=True)
self.model.global_time_offset = silence_duration + offset
@@ -300,7 +308,7 @@ class SimulStreamingASR():
if warmup_audio is not None:
warmup_audio = torch.from_numpy(warmup_audio).float()
if self.fast_encoder:
temp_model = PaddedAlignAttWhisper(
temp_model = AlignAtt(
cfg=self.cfg,
loaded_model=whisper_model,
mlx_encoder=self.mlx_encoder,

View File

@@ -1,43 +0,0 @@
class Tokens:
def __init__(self, tokens):
self.tokens = tokens
# def clone(self):
# return Tokens(self.tokens.clone())
def __str__(self):
return str(self.tokens.tolist())
def __repr__(self):
return self.__str__()
class BeamTokens(Tokens):
def __init__(self, tokens, beam_size):
self.tokens = tokens
self.beam_size = beam_size
def clone(self):
return BeamTokens(self.tokens.clone())
def __str__(self):
return f"BeamTokens({self.tokens.tolist()}, beam_size={self.beam_size})"
def __repr__(self):
return self.__str__()
def as_text(self, tokenizer):
return tokenizer.decode(self.tokens)
class Logits(Tokens):
def __init__(self, logits):
super().__init__(logits)
# def clone(self):
# return Logits(self.tokens.clone(), self.beam_size)
def __str__(self):
# return "abc"
return f"Logits({self.tokens.shape})"
def __repr__(self):
return self.__str__()

View File

@@ -1,17 +1,16 @@
# This code was originally in simul_whisper/transcriber/simul_whisper.py . It is adapted a lot for SimulStreaming.
import os
import logging
import torch
import torch.nn.functional as F
import numpy as np
from whisperlivekit.whisper import load_model, DecodingOptions, tokenizer
from whisperlivekit.whisper import DecodingOptions, tokenizer
from .config import AlignAttConfig
from whisperlivekit.timed_objects import ASRToken
from whisperlivekit.whisper.audio import log_mel_spectrogram, TOKENS_PER_SECOND, pad_or_trim, N_SAMPLES, N_FRAMES
from whisperlivekit.whisper.timing import median_filter
from whisperlivekit.whisper.decoding import GreedyDecoder, BeamSearchDecoder, SuppressTokens, detect_language
from whisperlivekit.whisper.decoding import GreedyDecoder, BeamSearchDecoder, SuppressTokens
from .beam import BeamPyTorchInference
from .eow_detection import fire_at_boundary, load_cif
import os
@@ -22,28 +21,37 @@ from whisperlivekit.backend_support import (
faster_backend_available,
)
import numpy as np
from ..timed_objects import PUNCTUATION_MARKS
from .generation_progress import *
DEC_PAD = 50257
logger = logging.getLogger(__name__)
HAS_MLX_WHISPER = False
HAS_FASTER_WHISPER = False
if mlx_backend_available():
from mlx_whisper.audio import log_mel_spectrogram as mlx_log_mel_spectrogram
from mlx_whisper.transcribe import pad_or_trim as mlx_pad_or_trim
HAS_MLX_WHISPER = True
if faster_backend_available():
from faster_whisper.audio import pad_or_trim as fw_pad_or_trim
from faster_whisper.feature_extractor import FeatureExtractor
HAS_FASTER_WHISPER = True
class PaddedAlignAttWhisper:
USE_MLCORE = False
def load_coreml_encoder():
try:
from coremltools.models import MLModel
except ImportError:
logger.warning("coremltools is not installed")
return None
COREML_ENCODER_PATH = os.environ.get("MLCORE_ENCODER_PATH", "whisperlivekit/whisper/whisper_encoder.mlpackage")
_coreml_encoder = MLModel(COREML_ENCODER_PATH)
spec = _coreml_encoder.get_spec()
_coreml_input_name = spec.description.input[0].name if spec.description.input else "mel"
_coreml_output_name = spec.description.output[0].name if spec.description.output else None
return _coreml_encoder, _coreml_input_name, _coreml_output_name
class AlignAtt:
def __init__(
self,
cfg: AlignAttConfig,
@@ -55,9 +63,13 @@ class PaddedAlignAttWhisper:
self.model = loaded_model
self.mlx_encoder = mlx_encoder
self.fw_encoder = fw_encoder
self.fw_encoder = fw_encoder
if fw_encoder:
self.fw_feature_extractor = FeatureExtractor(feature_size=self.model.dims.n_mels)
self.coreml_encoder_tuple = None
if USE_MLCORE:
self.coreml_encoder_tuple = load_coreml_encoder()
self.use_mlcore = self.coreml_encoder_tuple is not None
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
@@ -254,7 +266,7 @@ class PaddedAlignAttWhisper:
logger.debug("Refreshing segment:")
self.init_tokens()
self.last_attend_frame = -self.cfg.rewind_threshold
self.detected_language = None
# self.detected_language = None
self.cumulative_time_offset = 0.0
self.init_context()
logger.debug(f"Context: {self.context}")
@@ -393,15 +405,28 @@ class PaddedAlignAttWhisper:
else:
input_segments = self.segments[0]
# if self.cfg.language == "auto" and self.reset_tokenizer_to_auto_next_call:
# logger.debug("Resetting tokenizer to auto for new sentence.")
# self.create_tokenizer(None)
# self.detected_language = None
# self.init_tokens()
# self.reset_tokenizer_to_auto_next_call = False
# NEW : we can use a different encoder, before using standart whisper for cross attention with the hooks on the decoder
beg_encode = time()
if self.use_mlcore:
coreml_encoder, coreml_input_name, coreml_output_name = self.coreml_encoder_tuple
mel_padded = log_mel_spectrogram(
input_segments,
n_mels=self.model.dims.n_mels,
padding=N_SAMPLES,
device="cpu",
).unsqueeze(0)
mel = pad_or_trim(mel_padded, N_FRAMES)
content_mel_len = int((mel_padded.shape[2] - mel.shape[2]) / 2)
mel_np = np.ascontiguousarray(mel.numpy())
ml_inputs = {coreml_input_name or "mel": mel_np}
coreml_outputs = coreml_encoder.predict(ml_inputs)
if coreml_output_name and coreml_output_name in coreml_outputs:
encoder_feature_np = coreml_outputs[coreml_output_name]
else:
encoder_feature_np = next(iter(coreml_outputs.values()))
encoder_feature = torch.as_tensor(
np.array(encoder_feature_np),
device=self.device,
)
if self.mlx_encoder:
mlx_mel_padded = mlx_log_mel_spectrogram(audio=input_segments.detach(), n_mels=self.model.dims.n_mels, padding=N_SAMPLES)
mlx_mel = mlx_pad_or_trim(mlx_mel_padded, N_FRAMES, axis=-2)
@@ -616,10 +641,9 @@ class PaddedAlignAttWhisper:
timestamp_idx += len(word_tokens)
timestamp_entry = ASRToken(
start=current_timestamp,
end=current_timestamp + 0.1,
start=round(current_timestamp, 2),
end=round(current_timestamp + 0.1, 2),
text= word,
probability=0.95,
speaker=self.speaker,
detected_language=self.detected_language
).with_offset(

View File

@@ -0,0 +1,261 @@
"""
ALPHA. results are not great yet
To replace `whisperlivekit.silero_vad_iterator import FixedVADIterator`
by `from whisperlivekit.ten_vad_iterator import TenVADIterator`
Use self.vac = TenVADIterator() instead of self.vac = FixedVADIterator(models.vac_model)
"""
import numpy as np
from ten_vad import TenVad
class TenVADIterator:
def __init__(self,
threshold: float = 0.5,
sampling_rate: int = 16000,
min_silence_duration_ms: int = 100,
speech_pad_ms: int = 30):
self.vad = TenVad()
self.threshold = threshold
self.sampling_rate = sampling_rate
self.min_silence_duration_ms = min_silence_duration_ms
self.speech_pad_ms = speech_pad_ms
self.min_silence_samples = int(sampling_rate * min_silence_duration_ms / 1000)
self.speech_pad_samples = int(sampling_rate * speech_pad_ms / 1000)
self.reset_states()
def reset_states(self):
self.triggered = False
self.temp_end = 0
self.current_sample = 0
self.buffer = np.array([], dtype=np.float32)
def __call__(self, x, return_seconds=False):
if not isinstance(x, np.ndarray):
x = np.array(x, dtype=np.float32)
self.buffer = np.append(self.buffer, x)
chunk_size = 256
ret = None
while len(self.buffer) >= chunk_size:
chunk = self.buffer[:chunk_size].astype(np.int16)
self.buffer = self.buffer[chunk_size:]
window_size_samples = len(chunk)
self.current_sample += window_size_samples
speech_prob, speech_flag = self.vad.process(chunk)
if (speech_prob >= self.threshold) and self.temp_end:
self.temp_end = 0
if (speech_prob >= self.threshold) and not self.triggered:
self.triggered = True
speech_start = max(0, self.current_sample - self.speech_pad_samples - window_size_samples)
result = {'start': int(speech_start) if not return_seconds else round(speech_start / self.sampling_rate, 1)}
if ret is None:
ret = result
elif "end" in ret:
ret = result
else:
ret.update(result)
if (speech_prob < self.threshold - 0.15) and self.triggered:
if not self.temp_end:
self.temp_end = self.current_sample
if self.current_sample - self.temp_end < self.min_silence_samples:
continue
else:
speech_end = self.temp_end + self.speech_pad_samples - window_size_samples
self.temp_end = 0
self.triggered = False
result = {'end': int(speech_end) if not return_seconds else round(speech_end / self.sampling_rate, 1)}
if ret is None:
ret = result
else:
ret.update(result)
return ret if ret != {} else None
def test_on_record_wav():
import os
from pathlib import Path
audio_file = Path("record.wav")
if not audio_file.exists():
return
import soundfile as sf
audio_data, sample_rate = sf.read(str(audio_file), dtype='float32')
if len(audio_data.shape) > 1:
audio_data = np.mean(audio_data, axis=1)
vad = TenVADIterator(
threshold=0.5,
sampling_rate=sample_rate,
min_silence_duration_ms=100,
speech_pad_ms=30
)
chunk_size = 1024
speech_segments = []
current_segment = None
for i in range(0, len(audio_data), chunk_size):
chunk = audio_data[i:i+chunk_size]
if chunk.dtype != np.int16:
chunk_int16 = (chunk * 32767.0).astype(np.int16)
else:
chunk_int16 = chunk
result = vad(chunk_int16, return_seconds=True)
if result is not None:
if 'start' in result:
current_segment = {'start': result['start'], 'end': None}
print(f"Speech start detected at {result['start']:.2f}s")
elif 'end' in result:
if current_segment:
current_segment['end'] = result['end']
duration = current_segment['end'] - current_segment['start']
speech_segments.append(current_segment)
print(f"Speech end detected at {result['end']:.2f}s (duration: {duration:.2f}s)")
current_segment = None
else:
print(f"Speech end detected at {result['end']:.2f}s (no corresponding start)")
if current_segment and current_segment['end'] is None:
current_segment['end'] = len(audio_data) / sample_rate
speech_segments.append(current_segment)
print(f"End of file (last segment at {current_segment['start']:.2f}s)")
print("-" * 60)
print(f"\nSummary:")
print(f"Number of speech segments detected: {len(speech_segments)}")
if speech_segments:
total_speech_time = sum(seg['end'] - seg['start'] for seg in speech_segments)
total_time = len(audio_data) / sample_rate
speech_ratio = (total_speech_time / total_time) * 100
print(f"Total speech time: {total_speech_time:.2f}s")
print(f"Total file time: {total_time:.2f}s")
print(f"Speech ratio: {speech_ratio:.1f}%")
print(f"\nDetected segments:")
for i, seg in enumerate(speech_segments, 1):
print(f" {i}. {seg['start']:.2f}s - {seg['end']:.2f}s (duration: {seg['end'] - seg['start']:.2f}s)")
else:
print("No speech segments detected")
print("\n" + "=" * 60)
print("Extracting silence segments...")
silence_segments = []
total_time = len(audio_data) / sample_rate
if not speech_segments:
silence_segments = [{'start': 0.0, 'end': total_time}]
else:
if speech_segments[0]['start'] > 0:
silence_segments.append({'start': 0.0, 'end': speech_segments[0]['start']})
for i in range(len(speech_segments) - 1):
silence_start = speech_segments[i]['end']
silence_end = speech_segments[i + 1]['start']
if silence_end > silence_start:
silence_segments.append({'start': silence_start, 'end': silence_end})
if speech_segments[-1]['end'] < total_time:
silence_segments.append({'start': speech_segments[-1]['end'], 'end': total_time})
silence_audio = np.array([], dtype=audio_data.dtype)
for seg in silence_segments:
start_sample = int(seg['start'] * sample_rate)
end_sample = int(seg['end'] * sample_rate)
start_sample = max(0, min(start_sample, len(audio_data)))
end_sample = max(0, min(end_sample, len(audio_data)))
if end_sample > start_sample:
silence_audio = np.concatenate([silence_audio, audio_data[start_sample:end_sample]])
if len(silence_audio) > 0:
output_file = "record_silence_only.wav"
try:
import soundfile as sf
sf.write(output_file, silence_audio, sample_rate)
print(f"Silence file saved: {output_file}")
except ImportError:
try:
from scipy.io import wavfile
if silence_audio.dtype == np.float32:
silence_audio_int16 = (silence_audio * 32767.0).astype(np.int16)
else:
silence_audio_int16 = silence_audio.astype(np.int16)
wavfile.write(output_file, sample_rate, silence_audio_int16)
print(f"Silence file saved: {output_file}")
except ImportError:
print("Unable to save: soundfile or scipy required")
total_silence_time = sum(seg['end'] - seg['start'] for seg in silence_segments)
silence_ratio = (total_silence_time / total_time) * 100
print(f"Total silence duration: {total_silence_time:.2f}s")
print(f"Silence ratio: {silence_ratio:.1f}%")
print(f"Number of silence segments: {len(silence_segments)}")
print(f"\nYou can listen to {output_file} to verify that only silences are present.")
else:
print("No silence segments found (file entirely speech)")
print("\n" + "=" * 60)
print("Extracting speech segments...")
if speech_segments:
speech_audio = np.array([], dtype=audio_data.dtype)
for seg in speech_segments:
start_sample = int(seg['start'] * sample_rate)
end_sample = int(seg['end'] * sample_rate)
start_sample = max(0, min(start_sample, len(audio_data)))
end_sample = max(0, min(end_sample, len(audio_data)))
if end_sample > start_sample:
speech_audio = np.concatenate([speech_audio, audio_data[start_sample:end_sample]])
if len(speech_audio) > 0:
output_file = "record_speech_only.wav"
try:
import soundfile as sf
sf.write(output_file, speech_audio, sample_rate)
print(f"Speech file saved: {output_file}")
except ImportError:
try:
from scipy.io import wavfile
if speech_audio.dtype == np.float32:
speech_audio_int16 = (speech_audio * 32767.0).astype(np.int16)
else:
speech_audio_int16 = speech_audio.astype(np.int16)
wavfile.write(output_file, sample_rate, speech_audio_int16)
print(f"Speech file saved: {output_file}")
except ImportError:
print("Unable to save: soundfile or scipy required")
total_speech_time = sum(seg['end'] - seg['start'] for seg in speech_segments)
speech_ratio = (total_speech_time / total_time) * 100
print(f"Total speech duration: {total_speech_time:.2f}s")
print(f"Speech ratio: {speech_ratio:.1f}%")
print(f"Number of speech segments: {len(speech_segments)}")
print(f"\nYou can listen to {output_file} to verify that only speech segments are present.")
else:
print("No speech audio to extract")
else:
print("No speech segments found (file entirely silence)")
if __name__ == "__main__":
test_on_record_wav()

View File

@@ -1,5 +1,5 @@
from dataclasses import dataclass, field
from typing import Optional, Any, List
from typing import Optional, List, Union, Dict, Any
from datetime import timedelta
PUNCTUATION_MARKS = {'.', '!', '?', '', '', ''}
@@ -8,22 +8,19 @@ def format_time(seconds: float) -> str:
"""Format seconds as HH:MM:SS."""
return str(timedelta(seconds=int(seconds)))
@dataclass
class TimedText:
class Timed:
start: Optional[float] = 0
end: Optional[float] = 0
@dataclass
class TimedText(Timed):
text: Optional[str] = ''
speaker: Optional[int] = -1
probability: Optional[float] = None
is_dummy: Optional[bool] = False
detected_language: Optional[str] = None
def is_punctuation(self):
return self.text.strip() in PUNCTUATION_MARKS
def overlaps_with(self, other: 'TimedText') -> bool:
return not (self.end <= other.start or other.end <= self.start)
def has_punctuation(self) -> bool:
return any(char in PUNCTUATION_MARKS for char in self.text.strip())
def is_within(self, other: 'TimedText') -> bool:
return other.contains_timespan(self)
@@ -31,27 +28,25 @@ class TimedText:
def duration(self) -> float:
return self.end - self.start
def contains_time(self, time: float) -> bool:
return self.start <= time <= self.end
def contains_timespan(self, other: 'TimedText') -> bool:
return self.start <= other.start and self.end >= other.end
def __bool__(self):
def __bool__(self) -> bool:
return bool(self.text)
def __str__(self) -> str:
return str(self.text)
@dataclass()
class ASRToken(TimedText):
corrected_speaker: Optional[int] = -1
validated_speaker: bool = False
validated_text: bool = False
validated_language: bool = False
def with_offset(self, offset: float) -> "ASRToken":
"""Return a new token with the time offset added."""
return ASRToken(self.start + offset, self.end + offset, self.text, self.speaker, self.probability, detected_language=self.detected_language)
return ASRToken(self.start + offset, self.end + offset, self.text, self.speaker, detected_language=self.detected_language)
def is_silence(self) -> bool:
return False
@dataclass
class Sentence(TimedText):
@@ -70,68 +65,94 @@ class Transcript(TimedText):
sep: Optional[str] = None,
offset: float = 0
) -> "Transcript":
"""Collapse multiple ASR tokens into a single transcript span."""
sep = sep if sep is not None else ' '
text = sep.join(token.text for token in tokens)
probability = sum(token.probability for token in tokens if token.probability) / len(tokens) if tokens else None
if tokens:
start = offset + tokens[0].start
end = offset + tokens[-1].end
else:
start = None
end = None
return cls(start, end, text, probability=probability)
return cls(start, end, text)
@dataclass
class SpeakerSegment(TimedText):
class SpeakerSegment(Timed):
"""Represents a segment of audio attributed to a specific speaker.
No text nor probability is associated with this segment.
"""
speaker: Optional[int] = -1
pass
@dataclass
class Translation(TimedText):
pass
def approximate_cut_at(self, cut_time):
"""
Each word in text is considered to be of duration (end-start)/len(words in text)
"""
if not self.text or not self.contains_time(cut_time):
return self, None
words = self.text.split()
num_words = len(words)
if num_words == 0:
return self, None
duration_per_word = self.duration() / num_words
cut_word_index = int((cut_time - self.start) / duration_per_word)
if cut_word_index >= num_words:
cut_word_index = num_words -1
text0 = " ".join(words[:cut_word_index])
text1 = " ".join(words[cut_word_index:])
segment0 = Translation(start=self.start, end=cut_time, text=text0)
segment1 = Translation(start=cut_time, end=self.end, text=text1)
return segment0, segment1
@dataclass
class Silence():
duration: float
start: Optional[float] = None
end: Optional[float] = None
duration: Optional[float] = None
is_starting: bool = False
has_ended: bool = False
def compute_duration(self) -> Optional[float]:
if self.start is None or self.end is None:
return None
self.duration = self.end - self.start
return self.duration
def is_silence(self) -> bool:
return True
@dataclass
class Segment(TimedText):
"""Generic contiguous span built from tokens or silence markers."""
start: Optional[float]
end: Optional[float]
text: Optional[str]
speaker: Optional[str]
@classmethod
def from_tokens(
cls,
tokens: List[Union[ASRToken, Silence]],
is_silence: bool = False
) -> Optional["Segment"]:
"""Return a normalized segment representing the provided tokens."""
if not tokens:
return None
start_token = tokens[0]
end_token = tokens[-1]
if is_silence:
return cls(
start=start_token.start,
end=end_token.end,
text=None,
speaker=-2
)
else:
return cls(
start=start_token.start,
end=end_token.end,
text=''.join(token.text for token in tokens),
speaker=-1,
detected_language=start_token.detected_language
)
def is_silence(self) -> bool:
"""True when this segment represents a silence gap."""
return self.speaker == -2
@dataclass
class Line(TimedText):
translation: str = ''
def to_dict(self):
_dict = {
def to_dict(self) -> Dict[str, Any]:
"""Serialize the line for frontend consumption."""
_dict: Dict[str, Any] = {
'speaker': int(self.speaker) if self.speaker != -1 else 1,
'text': self.text,
'start': format_time(self.start),
@@ -143,6 +164,33 @@ class Line(TimedText):
_dict['detected_language'] = self.detected_language
return _dict
def build_from_tokens(self, tokens: List[ASRToken]) -> "Line":
"""Populate line attributes from a contiguous token list."""
self.text = ''.join([token.text for token in tokens])
self.start = tokens[0].start
self.end = tokens[-1].end
self.speaker = 1
self.detected_language = tokens[0].detected_language
return self
def build_from_segment(self, segment: Segment) -> "Line":
"""Populate the line fields from a pre-built segment."""
self.text = segment.text
self.start = segment.start
self.end = segment.end
self.speaker = segment.speaker
self.detected_language = segment.detected_language
return self
def is_silent(self) -> bool:
return self.speaker == -2
class SilentLine(Line):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.speaker = -2
self.text = ''
@dataclass
class FrontData():
@@ -155,8 +203,9 @@ class FrontData():
remaining_time_transcription: float = 0.
remaining_time_diarization: float = 0.
def to_dict(self):
_dict = {
def to_dict(self) -> Dict[str, Any]:
"""Serialize the front-end data payload."""
_dict: Dict[str, Any] = {
'status': self.status,
'lines': [line.to_dict() for line in self.lines if (line.text or line.speaker == -2)],
'buffer_transcription': self.buffer_transcription,
@@ -176,14 +225,22 @@ class ChangeSpeaker:
@dataclass
class State():
tokens: list = field(default_factory=list)
last_validated_token: int = 0
last_punctuation_index: Optional[int] = None
translation_validated_segments: list = field(default_factory=list)
buffer_translation: str = field(default_factory=Transcript)
buffer_transcription: str = field(default_factory=Transcript)
"""Unified state class for audio processing.
Contains both persistent state (tokens, buffers) and temporary update buffers
(new_* fields) that are consumed by TokensAlignment.
"""
# Persistent state
tokens: List[ASRToken] = field(default_factory=list)
buffer_transcription: Transcript = field(default_factory=Transcript)
end_buffer: float = 0.0
end_attributed_speaker: float = 0.0
remaining_time_transcription: float = 0.0
remaining_time_diarization: float = 0.0
beg_loop: Optional[int] = None
# Temporary update buffers (consumed by TokensAlignment.update())
new_tokens: List[Union[ASRToken, Silence]] = field(default_factory=list)
new_translation: List[Any] = field(default_factory=list)
new_diarization: List[Any] = field(default_factory=list)
new_tokens_buffer: List[Any] = field(default_factory=list) # only when local agreement
new_translation_buffer= TimedText()

View File

@@ -0,0 +1,177 @@
from time import time
from typing import Optional, List, Tuple, Union, Any
from whisperlivekit.timed_objects import Line, SilentLine, ASRToken, SpeakerSegment, Silence, TimedText, Segment
class TokensAlignment:
def __init__(self, state: Any, args: Any, sep: Optional[str]) -> None:
self.state = state
self.diarization = args.diarization
self._tokens_index: int = 0
self._diarization_index: int = 0
self._translation_index: int = 0
self.all_tokens: List[ASRToken] = []
self.all_diarization_segments: List[SpeakerSegment] = []
self.all_translation_segments: List[Any] = []
self.new_tokens: List[ASRToken] = []
self.new_diarization: List[SpeakerSegment] = []
self.new_translation: List[Any] = []
self.new_translation_buffer: Union[TimedText, str] = TimedText()
self.new_tokens_buffer: List[Any] = []
self.sep: str = sep if sep is not None else ' '
self.beg_loop: Optional[float] = None
def update(self) -> None:
"""Drain state buffers into the running alignment context."""
self.new_tokens, self.state.new_tokens = self.state.new_tokens, []
self.new_diarization, self.state.new_diarization = self.state.new_diarization, []
self.new_translation, self.state.new_translation = self.state.new_translation, []
self.new_tokens_buffer, self.state.new_tokens_buffer = self.state.new_tokens_buffer, []
self.all_tokens.extend(self.new_tokens)
self.all_diarization_segments.extend(self.new_diarization)
self.all_translation_segments.extend(self.new_translation)
self.new_translation_buffer = self.state.new_translation_buffer
def add_translation(self, line: Line) -> None:
"""Append translated text segments that overlap with a line."""
for ts in self.all_translation_segments:
if ts.is_within(line):
line.translation += ts.text + (self.sep if ts.text else '')
elif line.translation:
break
def compute_punctuations_segments(self, tokens: Optional[List[ASRToken]] = None) -> List[Segment]:
"""Group tokens into segments split by punctuation and explicit silence."""
segments = []
segment_start_idx = 0
for i, token in enumerate(self.all_tokens):
if token.is_silence():
previous_segment = Segment.from_tokens(
tokens=self.all_tokens[segment_start_idx: i],
)
if previous_segment:
segments.append(previous_segment)
segment = Segment.from_tokens(
tokens=[token],
is_silence=True
)
segments.append(segment)
segment_start_idx = i+1
else:
if token.has_punctuation():
segment = Segment.from_tokens(
tokens=self.all_tokens[segment_start_idx: i+1],
)
segments.append(segment)
segment_start_idx = i+1
final_segment = Segment.from_tokens(
tokens=self.all_tokens[segment_start_idx:],
)
if final_segment:
segments.append(final_segment)
return segments
def concatenate_diar_segments(self) -> List[SpeakerSegment]:
"""Merge consecutive diarization slices that share the same speaker."""
if not self.all_diarization_segments:
return []
merged = [self.all_diarization_segments[0]]
for segment in self.all_diarization_segments[1:]:
if segment.speaker == merged[-1].speaker:
merged[-1].end = segment.end
else:
merged.append(segment)
return merged
@staticmethod
def intersection_duration(seg1: TimedText, seg2: TimedText) -> float:
"""Return the overlap duration between two timed segments."""
start = max(seg1.start, seg2.start)
end = min(seg1.end, seg2.end)
return max(0, end - start)
def get_lines_diarization(self) -> Tuple[List[Line], str]:
"""Build lines when diarization is enabled and track overflow buffer."""
diarization_buffer = ''
punctuation_segments = self.compute_punctuations_segments()
diarization_segments = self.concatenate_diar_segments()
for punctuation_segment in punctuation_segments:
if not punctuation_segment.is_silence():
if diarization_segments and punctuation_segment.start >= diarization_segments[-1].end:
diarization_buffer += punctuation_segment.text
else:
max_overlap = 0.0
max_overlap_speaker = 1
for diarization_segment in diarization_segments:
intersec = self.intersection_duration(punctuation_segment, diarization_segment)
if intersec > max_overlap:
max_overlap = intersec
max_overlap_speaker = diarization_segment.speaker + 1
punctuation_segment.speaker = max_overlap_speaker
lines = []
if punctuation_segments:
lines = [Line().build_from_segment(punctuation_segments[0])]
for segment in punctuation_segments[1:]:
if segment.speaker == lines[-1].speaker:
if lines[-1].text:
lines[-1].text += segment.text
lines[-1].end = segment.end
else:
lines.append(Line().build_from_segment(segment))
return lines, diarization_buffer
def get_lines(
self,
diarization: bool = False,
translation: bool = False,
current_silence: Optional[Silence] = None
) -> Tuple[List[Line], str, Union[str, TimedText]]:
"""Return the formatted lines plus buffers, optionally with diarization/translation."""
if diarization:
lines, diarization_buffer = self.get_lines_diarization()
else:
diarization_buffer = ''
lines = []
current_line_tokens = []
for token in self.all_tokens:
if token.is_silence():
if current_line_tokens:
lines.append(Line().build_from_tokens(current_line_tokens))
current_line_tokens = []
end_silence = token.end if token.has_ended else time() - self.beg_loop
if lines and lines[-1].is_silent():
lines[-1].end = end_silence
else:
lines.append(SilentLine(
start = token.start,
end = end_silence
))
else:
current_line_tokens.append(token)
if current_line_tokens:
lines.append(Line().build_from_tokens(current_line_tokens))
if current_silence:
end_silence = current_silence.end if current_silence.has_ended else time() - self.beg_loop
if lines and lines[-1].is_silent():
lines[-1].end = end_silence
else:
lines.append(SilentLine(
start = current_silence.start,
end = end_silence
))
if translation:
[self.add_translation(line) for line in lines if not type(line) == Silence]
return lines, diarization_buffer, self.new_translation_buffer.text

View File

@@ -1,60 +0,0 @@
from typing import Sequence, Callable, Any, Optional, Dict
def _detect_tail_repetition(
seq: Sequence[Any],
key: Callable[[Any], Any] = lambda x: x, # extract comparable value
min_block: int = 1, # set to 2 to ignore 1-token loops like "."
max_tail: int = 300, # search window from the end for speed
prefer: str = "longest", # "longest" coverage or "smallest" block
) -> Optional[Dict]:
vals = [key(x) for x in seq][-max_tail:]
n = len(vals)
best = None
# try every possible block length
for b in range(min_block, n // 2 + 1):
block = vals[-b:]
# count how many times this block repeats contiguously at the very end
count, i = 0, n
while i - b >= 0 and vals[i - b:i] == block:
count += 1
i -= b
if count >= 2:
cand = {
"block_size": b,
"count": count,
"start_index": len(seq) - count * b, # in original seq
"end_index": len(seq),
}
if (best is None or
(prefer == "longest" and count * b > best["count"] * best["block_size"]) or
(prefer == "smallest" and b < best["block_size"])):
best = cand
return best
def trim_tail_repetition(
seq: Sequence[Any],
key: Callable[[Any], Any] = lambda x: x,
min_block: int = 1,
max_tail: int = 300,
prefer: str = "longest",
keep: int = 1, # how many copies of the repeating block to keep at the end (0 or 1 are common)
):
"""
Returns a new sequence with repeated tail trimmed.
keep=1 -> keep a single copy of the repeated block.
keep=0 -> remove all copies of the repeated block.
"""
rep = _detect_tail_repetition(seq, key, min_block, max_tail, prefer)
if not rep:
return seq, False # nothing to trim
b, c = rep["block_size"], rep["count"]
if keep < 0:
keep = 0
if keep >= c:
return seq, False # nothing to trim (already <= keep copies)
# new length = total - (copies_to_remove * block_size)
new_len = len(seq) - (c - keep) * b
return seq[:new_len], True

View File

@@ -391,12 +391,11 @@ function renderLinesWithBuffer(
if (idx === lines.length - 1) {
if (!isFinalizing && item.speaker !== -2) {
if (remaining_time_transcription > 0) {
speakerLabel += `<span class="label_transcription"><span class="spinner"></span>Transcription lag <span id='timeInfo'><span class="lag-transcription-value">${fmt1(
remaining_time_transcription
)}</span>s</span></span>`;
}
if (buffer_diarization && remaining_time_diarization > 0) {
if (buffer_diarization && remaining_time_diarization) {
speakerLabel += `<span class="label_diarization"><span class="spinner"></span>Diarization lag<span id='timeInfo'><span class="lag-diarization-value">${fmt1(
remaining_time_diarization
)}</span>s</span></span>`;

View File

@@ -4,17 +4,18 @@ import json
import os
import urllib
import warnings
from typing import List, Optional, Union, Dict
from typing import Dict, List, Optional, Union
import torch
from tqdm import tqdm
from pathlib import Path
from torch import Tensor
from .audio import load_audio, log_mel_spectrogram, pad_or_trim
from .decoding import DecodingOptions, DecodingResult, decode, detect_language
from .model import ModelDimensions, Whisper
from .transcribe import transcribe
from .version import __version__
from whisperlivekit.whisper.audio import load_audio, log_mel_spectrogram, pad_or_trim
from whisperlivekit.whisper.decoding import DecodingOptions, DecodingResult, decode, detect_language
from whisperlivekit.whisper.model import ModelDimensions, Whisper
from whisperlivekit.whisper.transcribe import transcribe
from whisperlivekit.whisper.version import __version__
_MODELS = {
"tiny.en": "https://openaipublic.azureedge.net/main/whisper/models/d3dd57d32accea0b295c96e26691aa14d8822fac7d9d27d5dc00b4ca2826dd03/tiny.en.pt",
@@ -233,13 +234,97 @@ def _convert_hf_state_dict(state_dict: Dict[str, torch.Tensor]) -> Dict[str, tor
return converted if converted else state_dict
def _load_lora_state(lora_path: str):
safe_path = os.path.join(lora_path, "adapter_model.safetensors")
bin_path = os.path.join(lora_path, "adapter_model.bin")
if os.path.isfile(safe_path):
try:
from safetensors.torch import load_file
except ImportError as exc:
raise ImportError(
"Loading LoRA adapters stored as .safetensors requires the `safetensors` package."
) from exc
return load_file(safe_path)
if os.path.isfile(bin_path):
return torch.load(bin_path, map_location="cpu")
raise FileNotFoundError(
f"No adapter weights found under {lora_path}. Expected adapter_model.safetensors or adapter_model.bin."
)
def _collapse_hf_module_name(module: str):
if module.startswith("base_model."):
module = module[len("base_model.") :]
if module.startswith("model.model."):
module = module[len("model.") :]
if not module.startswith("model."):
module = f"model.{module}"
return module
def _apply_lora_adapter(state_dict: Dict[str, Tensor], lora_path: Optional[str]):
if not lora_path:
return
config_path = os.path.join(lora_path, "adapter_config.json")
if not os.path.isfile(config_path):
raise FileNotFoundError(f"Missing adapter_config.json inside {lora_path}")
with open(config_path, "r", encoding="utf-8") as handle:
config = json.load(handle)
if config.get("peft_type") != "LORA":
raise ValueError("Only LoRA adapters are supported.")
r = config.get("r")
alpha = config.get("lora_alpha") or config.get("alpha")
if not r or not alpha:
raise ValueError("LoRA config must include `r` and `lora_alpha`.")
scaling = alpha / r
adapter_state = _load_lora_state(lora_path)
lora_layers: Dict[str, Dict[str, Tensor]] = {}
for key, tensor in adapter_state.items():
if key.endswith("lora_A.weight"):
module = key[: -len(".lora_A.weight")]
lora_layers.setdefault(module, {})["A"] = tensor
elif key.endswith("lora_B.weight"):
module = key[: -len(".lora_B.weight")]
lora_layers.setdefault(module, {})["B"] = tensor
if not lora_layers:
raise ValueError(f"No LoRA tensors found in {lora_path}")
for module, parts in lora_layers.items():
if "A" not in parts or "B" not in parts:
raise ValueError(f"Incomplete LoRA tensors for module '{module}'")
hf_module = _collapse_hf_module_name(module)
hf_weight_key = f"{hf_module}.weight"
delta = parts["B"] @ parts["A"]
delta = delta * scaling
converted = _convert_hf_state_dict({hf_weight_key: delta})
if not converted:
raise KeyError(f"Failed to map LoRA module '{module}' into Whisper state dict.")
target_name, delta_tensor = next(iter(converted.items()))
if target_name not in state_dict:
raise KeyError(
f"LoRA module '{module}' mapped to '{target_name}', but the base model has no such parameter."
)
state_dict[target_name] = state_dict[target_name] + delta_tensor.to(
dtype=state_dict[target_name].dtype, device=state_dict[target_name].device
)
def load_model(
name: str,
device: Optional[Union[str, torch.device]] = None,
download_root: str = None,
in_memory: bool = False,
decoder_only=False,
custom_alignment_heads=None
decoder_only: bool = False,
custom_alignment_heads: Optional[str] = None,
lora_path: Optional[str] = None,
) -> Whisper:
"""
Load a Whisper ASR model
@@ -255,6 +340,8 @@ def load_model(
path to download the model files; by default, it uses "~/.cache/whisper"
in_memory: bool
whether to preload the model weights into host memory
lora_path: str
optional directory containing PEFT LoRA adapter weights (adapter_config + adapter_model)
Returns
-------
@@ -302,6 +389,7 @@ def load_model(
else:
state_dict = checkpoint
state_dict = _convert_hf_state_dict(state_dict)
_apply_lora_adapter(state_dict, lora_path)
if dims_cfg is not None:
dims = ModelDimensions(**dims_cfg)
@@ -329,3 +417,47 @@ def load_model(
model.set_alignment_heads(alignment_heads)
return model.to(device)
def convert_encoder_to_coreml(
model_name = "base",
output_path= "whisper_encoder.mlpackage",
dummy_frames = 3000, #Number of time frames to use for the dummy mel input during tracing
precision = "float16",
):
import coremltools as ct
model = load_model(model_name, device="cpu", decoder_only=False)
encoder = model.encoder.eval().cpu()
dummy_input = torch.randn(
1,
model.dims.n_mels,
dummy_frames,
dtype=next(encoder.parameters()).dtype,
)
with torch.no_grad():
traced_encoder = torch.jit.trace(encoder, dummy_input)
precision_map = {
"float16": ct.precision.FLOAT16,
"fp16": ct.precision.FLOAT16,
"float32": ct.precision.FLOAT32,
"fp32": ct.precision.FLOAT32,
}
coreml_precision = precision_map[precision.lower()]
mlmodel = ct.convert(
traced_encoder,
inputs=[ct.TensorType(name="mel", shape=dummy_input.shape)],
convert_to= "mlprogram",
compute_precision=coreml_precision,
)
output_path = Path(output_path)
mlmodel.save(str(output_path))
return output_path
# if __name__ == "__main__":
# convert_encoder_to_coreml(model_name="tiny", output_path="whisper_encoder.mlpackage", dummy_frames=3000, precision="float16", convert_to="mlprogram")