29 Commits

Author SHA1 Message Date
Quentin Fuxa
6206fff118 0.2.15 2025-11-21 23:52:00 +01:00
Quentin Fuxa
b5067249c0 stt/diar/nllw alignment: internal rework 5 2025-11-20 23:52:00 +01:00
Quentin Fuxa
f4f9831d39 stt/diar/nllw alignment: internal rework 5 2025-11-20 23:52:00 +01:00
Quentin Fuxa
254faaf64c stt/diar/nllw alignment: internal rework 5 2025-11-20 23:52:00 +01:00
Quentin Fuxa
8e7aea4fcf internal rework 4 2025-11-20 23:45:20 +01:00
Quentin Fuxa
270faf2069 internal rework 3 2025-11-20 22:28:30 +01:00
Quentin Fuxa
b7c1cc77cc internal rework 2 2025-11-20 22:06:38 +01:00
Quentin Fuxa
9a45ec221c internal rework 1 2025-11-20 12:58:38 +01:00
Quentin Fuxa
3e13ee6fc3 bump to post4 2025-11-19 21:23:43 +01:00
Quentin Fuxa
b7d20a0ff0 segment attribution in result formatter 2025-11-19 21:10:28 +01:00
Quentin Fuxa
c1bb9c2bde reduce flickering remaining_time_transcription 2025-11-19 19:09:37 +01:00
Quentin Fuxa
11e9def0b2 diarization corrections 2025-11-19 19:06:03 +01:00
Quentin Fuxa
3104f40f6e fixes #279 #278 2025-11-19 18:17:50 +01:00
Quentin Fuxa
e9b4ceeee5 Add audio partial silence in chunks handling. bump to 0.2.14.post3 2025-11-17 22:52:00 +01:00
Quentin Fuxa
437641fb43 reduce min-chunk-size to 0.1, set default model to base 2027-04-25 23:52:00 +02:00
Quentin Fuxa
bfd60b3921 Add audio partial silence in chunks handling. bump to 0.2.14.post2 2025-11-17 22:52:00 +01:00
Quentin Fuxa
1e67bf97f0 improve buffering when use of heavy models 2027-04-25 23:52:00 +02:00
Quentin Fuxa
bbd4fd6cff Merge branch 'improve_EOS_handling' 2025-11-16 22:30:31 +01:00
Quentin Fuxa
28985962a0 Silence handling: finish transcription even if not validated at the BEGINNING of the silence 2025-11-16 22:29:08 +01:00
Quentin Fuxa
a38c103fcd simulstreaming coreml encoder compatibility 2025-11-16 21:24:14 +01:00
Quentin Fuxa
4d2ffb24f8 coreml conversion 2025-11-16 19:11:43 +01:00
Quentin Fuxa
1bbbb7903c lora loader in shared whisper core 2025-11-16 18:44:35 +01:00
Quentin Fuxa
bcffdbc6b3 bump to 0.2.14 2025-11-15 20:19:09 +01:00
Quentin Fuxa
80b77998f9 Refactor backend handling 2025-11-15 19:51:41 +01:00
Quentin Fuxa
d310f7e25f hf compatibility 2025-11-15 18:34:19 +01:00
Quentin Fuxa
8d9be88fe6 translation buffer is now displayed in frontend 2025-11-10 15:22:26 +01:00
Quentin Fuxa
16461052ed task to direct-english-translation 2025-11-10 13:20:26 +01:00
Quentin Fuxa
5491dbd824 last_validated_token handled in state 2025-11-10 13:18:52 +01:00
Quentin Fuxa
13401ffe24 whisper core at root of wlk 2025-11-10 12:17:18 +01:00
50 changed files with 1685 additions and 1506 deletions

View File

@@ -45,7 +45,7 @@ pip install whisperlivekit
#### Quick Start
1. **Start the transcription server:**
```bash
whisperlivekit-server --model base --language en
wlk --model base --language en
```
2. **Open your browser** and navigate to `http://localhost:8000`. Start speaking and watch your words appear in real-time!
@@ -53,6 +53,7 @@ pip install whisperlivekit
> - See [tokenizer.py](https://github.com/QuentinFuxa/WhisperLiveKit/blob/main/whisperlivekit/simul_whisper/whisper/tokenizer.py) for the list of all available languages.
> - For HTTPS requirements, see the **Parameters** section for SSL configuration options.
> - The CLI entry point is exposed as both `wlk` and `whisperlivekit-server`; they are equivalent.
#### Use it to capture audio from web pages.
@@ -68,13 +69,12 @@ Go to `chrome-extension` for instructions.
| Optional | `pip install` |
|-----------|-------------|
| **Speaker diarization** | `git+https://github.com/NVIDIA/NeMo.git@main#egg=nemo_toolkit[asr]` |
| **Windows/Linux optimizations** | `faster-whisper` |
| **Apple Silicon optimizations** | `mlx-whisper` |
| **Translation** | `nllw` |
| **Speaker diarization** | `git+https://github.com/NVIDIA/NeMo.git@main#egg=nemo_toolkit[asr]` |
| OpenAI API | `openai` |
| *[Not recommanded]* Speaker diarization with Diart | `diart` |
| *[Not recommanded]* Original Whisper backend | `whisper` |
| *[Not recommanded]* Improved timestamps backend | `whisper-timestamped` |
| OpenAI API backend | `openai` |
See **Parameters & Configuration** below on how to use them.
@@ -86,10 +86,10 @@ See **Parameters & Configuration** below on how to use them.
```bash
# Large model and translate from french to danish
whisperlivekit-server --model large-v3 --language fr --target-language da
wlk --model large-v3 --language fr --target-language da
# Diarization and server listening on */80
whisperlivekit-server --host 0.0.0.0 --port 80 --model medium --diarization --language fr
wlk --host 0.0.0.0 --port 80 --model medium --diarization --language fr
```
@@ -140,12 +140,12 @@ async def websocket_endpoint(websocket: WebSocket):
| Parameter | Description | Default |
|-----------|-------------|---------|
| `--model` | Whisper model size. List and recommandations [here](https://github.com/QuentinFuxa/WhisperLiveKit/blob/main/docs/available_models.md) | `small` |
| `--model-path` | .pt file/directory containing whisper model. Overrides `--model`. Recommandations [here](https://github.com/QuentinFuxa/WhisperLiveKit/blob/main/docs/models_compatible_formats.md) | `None` |
| `--language` | List [here](https://github.com/QuentinFuxa/WhisperLiveKit/blob/main/whisperlivekit/simul_whisper/whisper/tokenizer.py). If you use `auto`, the model attempts to detect the language automatically, but it tends to bias towards English. | `auto` |
| `--target-language` | If sets, translate to using NLLB. Ex: `fr`. [200 languages available](https://github.com/QuentinFuxa/WhisperLiveKit/blob/main/docs/supported_languages.md). If you want to translate to english, you should rather use `--task translate`, since Whisper can do it directly. | `None` |
| `--task` | Set to `translate` to translate *only* to english, using Whisper translation. | `transcribe` |
| `--model-path` | Local .pt file/directory **or** Hugging Face repo ID containing the Whisper model. Overrides `--model`. Recommandations [here](https://github.com/QuentinFuxa/WhisperLiveKit/blob/main/docs/models_compatible_formats.md) | `None` |
| `--language` | List [here](https://github.com/QuentinFuxa/WhisperLiveKit/blob/main/whisperlivekit/whisper/tokenizer.py). If you use `auto`, the model attempts to detect the language automatically, but it tends to bias towards English. | `auto` |
| `--target-language` | If sets, translates using [NLLW](https://github.com/QuentinFuxa/NoLanguageLeftWaiting). [200 languages available](https://github.com/QuentinFuxa/WhisperLiveKit/blob/main/docs/supported_languages.md). If you want to translate to english, you can also use `--direct-english-translation`. The STT model will try to directly output the translation. | `None` |
| `--diarization` | Enable speaker identification | `False` |
| `--backend` | Processing backend. You can switch to `faster-whisper` if `simulstreaming` does not work correctly | `simulstreaming` |
| `--backend-policy` | Streaming strategy: `1`/`simulstreaming` uses AlignAtt SimulStreaming, `2`/`localagreement` uses the LocalAgreement policy | `simulstreaming` |
| `--backend` | Whisper implementation selector. `auto` picks MLX on macOS (if installed), otherwise Faster-Whisper, otherwise vanilla Whisper. You can also force `mlx-whisper`, `faster-whisper`, `whisper`, or `openai-api` (LocalAgreement only) | `auto` |
| `--no-vac` | Disable Voice Activity Controller | `False` |
| `--no-vad` | Disable Voice Activity Detection | `False` |
| `--warmup-file` | Audio file path for model warmup | `jfk.wav` |

View File

@@ -0,0 +1,71 @@
### Alignment between STT Tokens and Diarization Segments
- Example 1: The punctuation from STT and the speaker change from Diariation come in the prediction `t`
- Example 2: The punctuation from STT comes from prediction `t`, but the speaker change from Diariation come in the prediction `t-1`
- Example 3: The punctuation from STT comes from prediction `t-1`, but the speaker change from Diariation come in the prediction `t`
> `#` Is the split between the `t-1` prediction and `t` prediction.
## Example 1:
```text
punctuations_segments : __#_______.__________________!____
diarization_segments:
SPK1 __#____________
SPK2 # ___________________
-->
ALIGNED SPK1 __#_______.
ALIGNED SPK2 # __________________!____
t-1 output:
SPK1: __#
SPK2: NO
DIARIZATION BUFFER: NO
t output:
SPK1: __#__.
SPK2: __________________!____
DIARIZATION BUFFER: No
```
## Example 2:
```text
punctuations_segments : _____#__.___________
diarization_segments:
SPK1 ___ #
SPK2 __#______________
-->
ALIGNED SPK1 _____#__.
ALIGNED SPK2 # ___________
t-1 output:
SPK1: ___ #
SPK2:
DIARIZATION BUFFER: __#
t output:
SPK1: __#__.
SPK2: ___________
DIARIZATION BUFFER: No
```
## Example 3:
```text
punctuations_segments : ___.__#__________
diarization_segments:
SPK1 ______#__
SPK2 # ________
-->
ALIGNED SPK1 ___. #
ALIGNED SPK2 __#__________
t-1 output:
SPK1: ___. #
SPK2:
DIARIZATION BUFFER: __#
t output:
SPK1: #
SPK2: __#___________
DIARIZATION BUFFER: NO
```

View File

@@ -13,4 +13,7 @@ May optionally contain:
- **`.bin` file** - faster-whisper model for encoder (requires faster-whisper)
- **`weights.npz`** or **`weights.safetensors`** - for encoder (requires whisper-mlx)
To improve speed/reduce allucinations, you may want to use `scripts/determine_alignment_heads.py` to determine the alignment heads to use for your model, and use the `--custom-alignment-heads` to pass them to WLK. If not, alignement heads are set to be all the heads of the last half layer of decoder.
## Hugging Face Repo ID
- Provide the repo ID (e.g. `openai/whisper-large-v3`) and WhisperLiveKit will download and cache the snapshot automatically. For gated repos, authenticate via `huggingface-cli login` first.
To improve speed/reduce allucinations, you may want to use `scripts/determine_alignment_heads.py` to determine the alignment heads to use for your model, and use the `--custom-alignment-heads` to pass them to WLK. If not, alignement heads are set to be all the heads of the last half layer of decoder.

View File

@@ -0,0 +1,43 @@
# Technical Integration Guide
This document introduce how to reuse the core components when you do **not** want to ship the bundled frontend, FastAPI server, or even the provided CLI.
---
## 1. Runtime Components
| Layer | File(s) | Purpose |
|-------|---------|---------|
| Transport | `whisperlivekit/basic_server.py`, any ASGI/WebSocket server | Accepts audio over WebSocket (MediaRecorder WebM or raw PCM chunks) and streams JSON updates back |
| Audio processing | `whisperlivekit/audio_processor.py` | Buffers audio, orchestrates transcription, diarization, translation, handles FFmpeg/PCM input |
| Engines | `whisperlivekit/core.py`, `whisperlivekit/simul_whisper/*`, `whisperlivekit/local_agreement/*` | Load models once (SimulStreaming or LocalAgreement), expose `TranscriptionEngine` and helpers |
| Frontends | `whisperlivekit/web/*`, `chrome-extension/*` | Optional UI layers feeding the WebSocket endpoint |
**Key idea:** The server boundary is just `AudioProcessor.process_audio()` for incoming bytes and the async generator returned by `AudioProcessor.create_tasks()` for outgoing updates (`FrontData`). Everything else is optional.
---
## 2. Running Without the Bundled Frontend
1. Start the server/engine however you like:
```bash
wlk --model small --language en --host 0.0.0.0 --port 9000
# or launch your own app that instantiates TranscriptionEngine(...)
```
2. Build your own client (browser, mobile, desktop) that:
- Opens `ws(s)://<host>:<port>/asr`
- Sends either MediaRecorder/Opus WebM blobs **or** raw PCM (`--pcm-input` on the server tells the client to use the AudioWorklet).
- Consumes the JSON payload defined in `docs/API.md`.
---
## 3. Running Without FastAPI
`whisperlivekit/basic_server.py` is just an example. Any async framework works, as long as you:
1. Create a global `TranscriptionEngine` (expensive to initialize; reuse it).
2. Instantiate `AudioProcessor(transcription_engine=engine)` for each connection.
3. Call `create_tasks()` to get the async generator, `process_audio()` with incoming bytes, and ensure `cleanup()` runs when the client disconnects.
If you prefer to send compressed audio, instantiate `AudioProcessor(pcm_input=False)` and pipe encoded chunks through `FFmpegManager` transparently—just ensure `ffmpeg` is available or be ready to handle the `"ffmpeg_not_found"` error in the streamed `FrontData`.

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "whisperlivekit"
version = "0.2.13.post1"
version = "0.2.15"
description = "Real-time speech-to-text with speaker diarization using Whisper"
readme = "README.md"
authors = [
@@ -34,6 +34,7 @@ dependencies = [
"websockets",
"torchaudio>=2.0.0",
"torch>=2.0.0",
"huggingface-hub>=0.25.0",
"tqdm",
"tiktoken",
'triton>=2.0.0; platform_machine == "x86_64" and (sys_platform == "linux" or sys_platform == "linux2")'
@@ -48,21 +49,22 @@ Homepage = "https://github.com/QuentinFuxa/WhisperLiveKit"
[project.scripts]
whisperlivekit-server = "whisperlivekit.basic_server:main"
wlk = "whisperlivekit.basic_server:main"
[tool.setuptools]
packages = [
"whisperlivekit",
"whisperlivekit.diarization",
"whisperlivekit.simul_whisper",
"whisperlivekit.simul_whisper.whisper",
"whisperlivekit.simul_whisper.whisper.assets",
"whisperlivekit.simul_whisper.whisper.normalizers",
"whisperlivekit.whisper",
"whisperlivekit.whisper.assets",
"whisperlivekit.whisper.normalizers",
"whisperlivekit.web",
"whisperlivekit.whisper_streaming_custom",
"whisperlivekit.local_agreement",
"whisperlivekit.vad_models"
]
[tool.setuptools.package-data]
whisperlivekit = ["web/*.html", "web/*.css", "web/*.js", "web/src/*.svg"]
"whisperlivekit.simul_whisper.whisper.assets" = ["*.tiktoken", "*.npz"]
"whisperlivekit.whisper.assets" = ["*.tiktoken", "*.npz"]
"whisperlivekit.vad_models" = ["*.jit", "*.onnx"]

View File

@@ -0,0 +1,153 @@
#!/usr/bin/env python3
"""
Convert a Hugging Face style Whisper checkpoint into a WhisperLiveKit .pt file.
Optionally shrink the supported audio chunk length (in seconds) by trimming the
encoder positional embeddings and updating the stored model dimensions.
"""
import argparse
import json
import os
from pathlib import Path
from typing import Dict, Tuple
import torch
from whisperlivekit.whisper.audio import HOP_LENGTH, SAMPLE_RATE
from whisperlivekit.whisper.model import ModelDimensions
from whisperlivekit.whisper.utils import exact_div
from whisperlivekit.whisper import _convert_hf_state_dict
def _load_state_dict(repo_path: Path) -> Dict[str, torch.Tensor]:
safetensor_path = repo_path / "model.safetensors"
bin_path = repo_path / "pytorch_model.bin"
if safetensor_path.is_file():
try:
from safetensors.torch import load_file # type: ignore
except Exception as exc: # pragma: no cover - import guard
raise RuntimeError(
"Install safetensors to load model.safetensors "
"(pip install safetensors)"
) from exc
return load_file(str(safetensor_path))
if bin_path.is_file():
return torch.load(bin_path, map_location="cpu")
raise FileNotFoundError(
f"Could not find model.safetensors or pytorch_model.bin under {repo_path}"
)
def _load_config(repo_path: Path) -> Dict:
config_path = repo_path / "config.json"
if not config_path.is_file():
raise FileNotFoundError(
f"Hugging Face checkpoint at {repo_path} is missing config.json"
)
with open(config_path, "r", encoding="utf-8") as fp:
return json.load(fp)
def _derive_audio_ctx(chunk_length: float) -> Tuple[int, int]:
n_samples = int(round(chunk_length * SAMPLE_RATE))
expected_samples = chunk_length * SAMPLE_RATE
if abs(n_samples - expected_samples) > 1e-6:
raise ValueError(
"chunk_length must align with sample rate so that "
"chunk_length * SAMPLE_RATE is an integer"
)
n_frames = exact_div(n_samples, HOP_LENGTH)
n_audio_ctx = exact_div(n_frames, 2)
return n_frames, n_audio_ctx
def _build_dims(config: Dict, chunk_length: float) -> Dict:
base_dims = ModelDimensions(
n_mels=config["num_mel_bins"],
n_audio_ctx=config["max_source_positions"],
n_audio_state=config["d_model"],
n_audio_head=config["encoder_attention_heads"],
n_audio_layer=config.get("encoder_layers") or config["num_hidden_layers"],
n_vocab=config["vocab_size"],
n_text_ctx=config["max_target_positions"],
n_text_state=config["d_model"],
n_text_head=config["decoder_attention_heads"],
n_text_layer=config["decoder_layers"],
).__dict__.copy()
_, n_audio_ctx = _derive_audio_ctx(chunk_length)
base_dims["n_audio_ctx"] = n_audio_ctx
base_dims["chunk_length"] = chunk_length
return base_dims
def _trim_positional_embedding(
state_dict: Dict[str, torch.Tensor], target_ctx: int
) -> None:
key = "encoder.positional_embedding"
if key not in state_dict:
raise KeyError(f"{key} missing from converted state dict")
tensor = state_dict[key]
if tensor.shape[0] < target_ctx:
raise ValueError(
f"Cannot increase encoder ctx from {tensor.shape[0]} to {target_ctx}"
)
if tensor.shape[0] == target_ctx:
return
state_dict[key] = tensor[:target_ctx].contiguous()
def convert_checkpoint(hf_path: Path, output_path: Path, chunk_length: float) -> None:
state_dict = _load_state_dict(hf_path)
converted = _convert_hf_state_dict(state_dict)
config = _load_config(hf_path)
dims = _build_dims(config, chunk_length)
_trim_positional_embedding(converted, dims["n_audio_ctx"])
package = {"dims": dims, "model_state_dict": converted}
output_path.parent.mkdir(parents=True, exist_ok=True)
torch.save(package, output_path)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Convert Hugging Face Whisper checkpoint to WhisperLiveKit format."
)
parser.add_argument(
"hf_path",
type=str,
help="Path to the cloned Hugging Face repository (e.g. whisper-tiny.en)",
)
parser.add_argument(
"--output",
type=str,
default="converted-whisper.pt",
help="Destination path for the .pt file",
)
parser.add_argument(
"--chunk-length",
type=float,
default=30.0,
help="Audio chunk length in seconds to support (default: 30)",
)
return parser.parse_args()
def main():
args = parse_args()
hf_path = Path(os.path.expanduser(args.hf_path)).resolve()
output_path = Path(os.path.expanduser(args.output)).resolve()
convert_checkpoint(hf_path, output_path, args.chunk_length)
print(f"Saved converted checkpoint to {output_path}")
if __name__ == "__main__":
main()

View File

@@ -1,43 +1,46 @@
import asyncio
import numpy as np
from time import time, sleep
import math
from time import time
import logging
import traceback
from typing import Optional, Union, List, Any, AsyncGenerator
from whisperlivekit.timed_objects import ASRToken, Silence, Line, FrontData, State, Transcript, ChangeSpeaker
from whisperlivekit.core import TranscriptionEngine, online_factory, online_diarization_factory, online_translation_factory
from whisperlivekit.silero_vad_iterator import FixedVADIterator
from whisperlivekit.results_formater import format_output
from whisperlivekit.ffmpeg_manager import FFmpegManager, FFmpegState
from whisperlivekit.tokens_alignment import TokensAlignment
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
SENTINEL = object() # unique sentinel object for end of stream marker
MIN_DURATION_REAL_SILENCE = 5
def cut_at(cumulative_pcm, cut_sec):
cumulative_len = 0
cut_sample = int(cut_sec * 16000)
async def get_all_from_queue(queue: asyncio.Queue) -> Union[object, Silence, np.ndarray, List[Any]]:
items: List[Any] = []
first_item = await queue.get()
queue.task_done()
if first_item is SENTINEL:
return first_item
if isinstance(first_item, Silence):
return first_item
items.append(first_item)
for ind, pcm_array in enumerate(cumulative_pcm):
if (cumulative_len + len(pcm_array)) >= cut_sample:
cut_chunk = cut_sample - cumulative_len
before = np.concatenate(cumulative_pcm[:ind] + [cumulative_pcm[ind][:cut_chunk]])
after = [cumulative_pcm[ind][cut_chunk:]] + cumulative_pcm[ind+1:]
return before, after
cumulative_len += len(pcm_array)
return np.concatenate(cumulative_pcm), []
async def get_all_from_queue(queue):
items = []
try:
while True:
item = queue.get_nowait()
items.append(item)
except asyncio.QueueEmpty:
pass
return items
while True:
if not queue._queue:
break
next_item = queue._queue[0]
if next_item is SENTINEL:
break
if isinstance(next_item, Silence):
break
items.append(await queue.get())
queue.task_done()
if isinstance(items[0], np.ndarray):
return np.concatenate(items)
else: #translation
return items
class AudioProcessor:
"""
@@ -45,7 +48,7 @@ class AudioProcessor:
Handles audio processing, state management, and result formatting.
"""
def __init__(self, **kwargs):
def __init__(self, **kwargs: Any) -> None:
"""Initialize the audio processor with configuration, models, and state."""
if 'transcription_engine' in kwargs and isinstance(kwargs['transcription_engine'], TranscriptionEngine):
@@ -64,36 +67,27 @@ class AudioProcessor:
self.is_pcm_input = self.args.pcm_input
# State management
self.is_stopping = False
self.silence = False
self.silence_duration = 0.0
self.state = State()
self.lock = asyncio.Lock()
self.sep = " " # Default separator
self.last_response_content = FrontData()
self.last_detected_speaker = None
self.speaker_languages = {}
self.diarization_before_transcription = False
self.is_stopping: bool = False
self.current_silence: Optional[Silence] = None
self.state: State = State()
self.lock: asyncio.Lock = asyncio.Lock()
self.sep: str = " " # Default separator
self.last_response_content: FrontData = FrontData()
self.segments = []
self.tokens_alignment: TokensAlignment = TokensAlignment(self.state, self.args, self.sep)
self.beg_loop: Optional[float] = None
if self.diarization_before_transcription:
self.cumulative_pcm = []
self.last_start = 0.0
self.last_end = 0.0
# Models and processing
self.asr = models.asr
self.vac_model = models.vac_model
self.asr: Any = models.asr
self.vac_model: Any = models.vac_model
if self.args.vac:
self.vac = FixedVADIterator(models.vac_model)
self.vac: Optional[FixedVADIterator] = FixedVADIterator(models.vac_model)
else:
self.vac = None
self.vac: Optional[FixedVADIterator] = None
self.ffmpeg_manager = None
self.ffmpeg_reader_task = None
self._ffmpeg_error = None
self.ffmpeg_manager: Optional[FFmpegManager] = None
self.ffmpeg_reader_task: Optional[asyncio.Task] = None
self._ffmpeg_error: Optional[str] = None
if not self.is_pcm_input:
self.ffmpeg_manager = FFmpegManager(
@@ -105,20 +99,20 @@ class AudioProcessor:
self._ffmpeg_error = error_type
self.ffmpeg_manager.on_error_callback = handle_ffmpeg_error
self.transcription_queue = asyncio.Queue() if self.args.transcription else None
self.diarization_queue = asyncio.Queue() if self.args.diarization else None
self.translation_queue = asyncio.Queue() if self.args.target_language else None
self.pcm_buffer = bytearray()
self.transcription_task = None
self.diarization_task = None
self.translation_task = None
self.watchdog_task = None
self.all_tasks_for_cleanup = []
self.transcription_queue: Optional[asyncio.Queue] = asyncio.Queue() if self.args.transcription else None
self.diarization_queue: Optional[asyncio.Queue] = asyncio.Queue() if self.args.diarization else None
self.translation_queue: Optional[asyncio.Queue] = asyncio.Queue() if self.args.target_language else None
self.pcm_buffer: bytearray = bytearray()
self.total_pcm_samples: int = 0
self.transcription_task: Optional[asyncio.Task] = None
self.diarization_task: Optional[asyncio.Task] = None
self.translation_task: Optional[asyncio.Task] = None
self.watchdog_task: Optional[asyncio.Task] = None
self.all_tasks_for_cleanup: List[asyncio.Task] = []
self.transcription = None
self.translation = None
self.diarization = None
self.transcription: Optional[Any] = None
self.translation: Optional[Any] = None
self.diarization: Optional[Any] = None
if self.args.transcription:
self.transcription = online_factory(self.args, models.asr)
@@ -128,27 +122,67 @@ class AudioProcessor:
if models.translation_model:
self.translation = online_translation_factory(self.args, models.translation_model)
def convert_pcm_to_float(self, pcm_buffer):
async def _push_silence_event(self) -> None:
if self.transcription_queue:
await self.transcription_queue.put(self.current_silence)
if self.args.diarization and self.diarization_queue:
await self.diarization_queue.put(self.current_silence)
if self.translation_queue:
await self.translation_queue.put(self.current_silence)
async def _begin_silence(self) -> None:
if self.current_silence:
return
now = time() - self.beg_loop
self.current_silence = Silence(
is_starting=True, start=now
)
await self._push_silence_event()
async def _end_silence(self) -> None:
if not self.current_silence:
return
now = time() - self.beg_loop
self.current_silence.end = now
self.current_silence.is_starting=False
self.current_silence.has_ended=True
self.current_silence.compute_duration()
if self.current_silence.duration > MIN_DURATION_REAL_SILENCE:
self.state.new_tokens.append(self.current_silence)
await self._push_silence_event()
self.current_silence = None
async def _enqueue_active_audio(self, pcm_chunk: np.ndarray) -> None:
if pcm_chunk is None or pcm_chunk.size == 0:
return
if self.transcription_queue:
await self.transcription_queue.put(pcm_chunk.copy())
if self.args.diarization and self.diarization_queue:
await self.diarization_queue.put(pcm_chunk.copy())
def _slice_before_silence(self, pcm_array: np.ndarray, chunk_sample_start: int, silence_sample: Optional[int]) -> Optional[np.ndarray]:
if silence_sample is None:
return None
relative_index = int(silence_sample - chunk_sample_start)
if relative_index <= 0:
return None
split_index = min(relative_index, len(pcm_array))
if split_index <= 0:
return None
return pcm_array[:split_index]
def convert_pcm_to_float(self, pcm_buffer: Union[bytes, bytearray]) -> np.ndarray:
"""Convert PCM buffer in s16le format to normalized NumPy array."""
return np.frombuffer(pcm_buffer, dtype=np.int16).astype(np.float32) / 32768.0
async def add_dummy_token(self):
"""Placeholder token when no transcription is available."""
async with self.lock:
current_time = time() - self.state.beg_loop
self.state.tokens.append(ASRToken(
start=current_time, end=current_time + 1,
text=".", speaker=-1, is_dummy=True
))
async def get_current_state(self):
async def get_current_state(self) -> State:
"""Get current state."""
async with self.lock:
current_time = time()
remaining_transcription = 0
if self.state.end_buffer > 0:
remaining_transcription = max(0, round(current_time - self.state.beg_loop - self.state.end_buffer, 1))
remaining_transcription = max(0, round(current_time - self.beg_loop - self.state.end_buffer, 1))
remaining_diarization = 0
if self.state.tokens:
@@ -160,7 +194,7 @@ class AudioProcessor:
return self.state
async def ffmpeg_stdout_reader(self):
async def ffmpeg_stdout_reader(self) -> None:
"""Read audio data from FFmpeg stdout and process it into the PCM pipeline."""
beg = time()
while True:
@@ -203,50 +237,60 @@ class AudioProcessor:
await asyncio.sleep(0.2)
logger.info("FFmpeg stdout processing finished. Signaling downstream processors if needed.")
if not self.diarization_before_transcription and self.transcription_queue:
if self.transcription_queue:
await self.transcription_queue.put(SENTINEL)
if self.diarization:
await self.diarization_queue.put(SENTINEL)
if self.translation:
await self.translation_queue.put(SENTINEL)
async def transcription_processor(self):
async def transcription_processor(self) -> None:
"""Process audio chunks for transcription."""
cumulative_pcm_duration_stream_time = 0.0
while True:
try:
item = await self.transcription_queue.get()
# item = await self.transcription_queue.get()
item = await get_all_from_queue(self.transcription_queue)
if item is SENTINEL:
logger.debug("Transcription processor received sentinel. Finishing.")
self.transcription_queue.task_done()
break
asr_internal_buffer_duration_s = len(getattr(self.transcription, 'audio_buffer', [])) / self.transcription.SAMPLING_RATE
transcription_lag_s = max(0.0, time() - self.state.beg_loop - self.state.end_buffer)
transcription_lag_s = max(0.0, time() - self.beg_loop - self.state.end_buffer)
asr_processing_logs = f"internal_buffer={asr_internal_buffer_duration_s:.2f}s | lag={transcription_lag_s:.2f}s |"
if type(item) is Silence:
asr_processing_logs += f" + Silence of = {item.duration:.2f}s"
stream_time_end_of_current_pcm = cumulative_pcm_duration_stream_time
new_tokens = []
current_audio_processed_upto = self.state.end_buffer
if isinstance(item, Silence):
if item.is_starting:
new_tokens, current_audio_processed_upto = await asyncio.to_thread(
self.transcription.start_silence
)
asr_processing_logs += f" + Silence starting"
if item.has_ended:
asr_processing_logs += f" + Silence of = {item.duration:.2f}s"
cumulative_pcm_duration_stream_time += item.duration
current_audio_processed_upto = cumulative_pcm_duration_stream_time
self.transcription.end_silence(item.duration, self.state.tokens[-1].end if self.state.tokens else 0)
if self.state.tokens:
asr_processing_logs += f" | last_end = {self.state.tokens[-1].end} |"
logger.info(asr_processing_logs)
cumulative_pcm_duration_stream_time += item.duration
self.transcription.insert_silence(item.duration, self.state.tokens[-1].end if self.state.tokens else 0)
continue
new_tokens = new_tokens or []
current_audio_processed_upto = max(current_audio_processed_upto, stream_time_end_of_current_pcm)
elif isinstance(item, ChangeSpeaker):
self.transcription.new_speaker(item)
continue
elif isinstance(item, np.ndarray):
pcm_array = item
logger.info(asr_processing_logs)
duration_this_chunk = len(pcm_array) / self.sample_rate
cumulative_pcm_duration_stream_time += duration_this_chunk
stream_time_end_of_current_pcm = cumulative_pcm_duration_stream_time
logger.info(asr_processing_logs)
cumulative_pcm_duration_stream_time += len(pcm_array) / self.sample_rate
stream_time_end_of_current_pcm = cumulative_pcm_duration_stream_time
self.transcription.insert_audio_chunk(pcm_array, stream_time_end_of_current_pcm)
new_tokens, current_audio_processed_upto = await asyncio.to_thread(self.transcription.process_iter)
new_tokens = new_tokens or []
self.transcription.insert_audio_chunk(pcm_array, stream_time_end_of_current_pcm)
new_tokens, current_audio_processed_upto = await asyncio.to_thread(self.transcription.process_iter)
_buffer_transcript = self.transcription.get_buffer()
buffer_text = _buffer_transcript.text
@@ -269,13 +313,12 @@ class AudioProcessor:
self.state.tokens.extend(new_tokens)
self.state.buffer_transcription = _buffer_transcript
self.state.end_buffer = max(candidate_end_times)
self.state.new_tokens.extend(new_tokens)
self.state.new_tokens_buffer = _buffer_transcript
if self.translation_queue:
for token in new_tokens:
await self.translation_queue.put(token)
self.transcription_queue.task_done()
await self.translation_queue.put(token)
except Exception as e:
logger.warning(f"Exception in transcription_processor: {e}")
logger.warning(f"Traceback: {traceback.format_exc()}")
@@ -292,124 +335,57 @@ class AudioProcessor:
logger.info("Transcription processor task finished.")
async def diarization_processor(self, diarization_obj):
"""Process audio chunks for speaker diarization."""
if self.diarization_before_transcription:
self.current_speaker = 0
await self.transcription_queue.put(ChangeSpeaker(speaker=self.current_speaker, start=0.0))
async def diarization_processor(self) -> None:
while True:
try:
item = await self.diarization_queue.get()
item = await get_all_from_queue(self.diarization_queue)
if item is SENTINEL:
logger.debug("Diarization processor received sentinel. Finishing.")
self.diarization_queue.task_done()
break
elif type(item) is Silence:
diarization_obj.insert_silence(item.duration)
if item.has_ended:
self.diarization.insert_silence(item.duration)
continue
elif isinstance(item, np.ndarray):
pcm_array = item
else:
raise Exception('item should be pcm_array')
# Process diarization
await diarization_obj.diarize(pcm_array)
if self.diarization_before_transcription:
segments = diarization_obj.get_segments()
self.cumulative_pcm.append(pcm_array)
if segments:
last_segment = segments[-1]
if last_segment.speaker != self.current_speaker:
cut_sec = last_segment.start - self.last_end
to_transcript, self.cumulative_pcm = cut_at(self.cumulative_pcm, cut_sec)
await self.transcription_queue.put(to_transcript)
self.current_speaker = last_segment.speaker
await self.transcription_queue.put(ChangeSpeaker(speaker=self.current_speaker, start=last_segment.start))
cut_sec = last_segment.end - last_segment.start
to_transcript, self.cumulative_pcm = cut_at(self.cumulative_pcm, cut_sec)
await self.transcription_queue.put(to_transcript)
self.last_start = last_segment.start
self.last_end = last_segment.end
else:
cut_sec = last_segment.end - self.last_end
to_transcript, self.cumulative_pcm = cut_at(self.cumulative_pcm, cut_sec)
await self.transcription_queue.put(to_transcript)
self.last_end = last_segment.end
elif not self.diarization_before_transcription:
async with self.lock:
self.state.tokens = diarization_obj.assign_speakers_to_tokens(
self.state.tokens,
use_punctuation_split=self.args.punctuation_split
)
if len(self.state.tokens) > 0:
self.state.end_attributed_speaker = max(self.state.tokens[-1].end, self.state.end_attributed_speaker)
self.diarization_queue.task_done()
self.diarization.insert_audio_chunk(item)
diarization_segments = await self.diarization.diarize()
self.state.new_diarization = diarization_segments
except Exception as e:
logger.warning(f"Exception in diarization_processor: {e}")
logger.warning(f"Traceback: {traceback.format_exc()}")
if 'pcm_array' in locals() and pcm_array is not SENTINEL:
self.diarization_queue.task_done()
logger.info("Diarization processor task finished.")
async def translation_processor(self):
async def translation_processor(self) -> None:
# the idea is to ignore diarization for the moment. We use only transcription tokens.
# And the speaker is attributed given the segments used for the translation
# in the future we want to have different languages for each speaker etc, so it will be more complex.
while True:
try:
item = await self.translation_queue.get() #block until at least 1 token
item = await get_all_from_queue(self.translation_queue)
if item is SENTINEL:
logger.debug("Translation processor received sentinel. Finishing.")
self.translation_queue.task_done()
break
elif type(item) is Silence:
self.translation.insert_silence(item.duration)
continue
# get all the available tokens for translation. The more words, the more precise
tokens_to_process = [item]
additional_tokens = await get_all_from_queue(self.translation_queue)
sentinel_found = False
for additional_token in additional_tokens:
if additional_token is SENTINEL:
sentinel_found = True
break
elif type(additional_token) is Silence:
self.translation.insert_silence(additional_token.duration)
if item.is_starting:
new_translation, new_translation_buffer = self.translation.validate_buffer_and_reset()
if item.has_ended:
self.translation.insert_silence(item.duration)
continue
else:
tokens_to_process.append(additional_token)
if tokens_to_process:
self.translation.insert_tokens(tokens_to_process)
translation_validated_segments, translation_buffer = await asyncio.to_thread(self.translation.process)
async with self.lock:
self.state.translation_validated_segments = translation_validated_segments
self.state.translation_buffer = translation_buffer
self.translation_queue.task_done()
for _ in additional_tokens:
self.translation_queue.task_done()
if sentinel_found:
logger.debug("Translation processor received sentinel in batch. Finishing.")
break
elif isinstance(item, ChangeSpeaker):
new_translation, new_translation_buffer = self.translation.validate_buffer_and_reset()
pass
else:
self.translation.insert_tokens(item)
new_translation, new_translation_buffer = await asyncio.to_thread(self.translation.process)
async with self.lock:
self.state.new_translation.append(new_translation)
self.state.new_translation_buffer = new_translation_buffer
except Exception as e:
logger.warning(f"Exception in translation_processor: {e}")
logger.warning(f"Traceback: {traceback.format_exc()}")
if 'token' in locals() and item is not SENTINEL:
self.translation_queue.task_done()
if 'additional_tokens' in locals():
for _ in additional_tokens:
self.translation_queue.task_done()
logger.info("Translation processor task finished.")
async def results_formatter(self):
async def results_formatter(self) -> AsyncGenerator[FrontData, None]:
"""Format processing results for output."""
while True:
try:
@@ -419,48 +395,32 @@ class AudioProcessor:
await asyncio.sleep(1)
continue
state = await self.get_current_state()
lines, undiarized_text = format_output(
state,
self.silence,
args = self.args,
sep=self.sep
self.tokens_alignment.update()
lines, buffer_diarization_text, buffer_translation_text = self.tokens_alignment.get_lines(
diarization=self.args.diarization,
translation=bool(self.translation),
current_silence=self.current_silence
)
if lines and lines[-1].speaker == -2:
buffer_transcription = Transcript()
else:
buffer_transcription = state.buffer_transcription
state = await self.get_current_state()
buffer_diarization = ''
if undiarized_text:
buffer_diarization = self.sep.join(undiarized_text)
buffer_transcription_text = state.buffer_transcription.text if state.buffer_transcription else ''
async with self.lock:
self.state.end_attributed_speaker = state.end_attributed_speaker
response_status = "active_transcription"
if not state.tokens and not buffer_transcription and not buffer_diarization:
if not lines and not buffer_transcription_text and not buffer_diarization_text:
response_status = "no_audio_detected"
lines = []
elif not lines:
lines = [Line(
speaker=1,
start=state.end_buffer,
end=state.end_buffer
)]
response = FrontData(
status=response_status,
lines=lines,
buffer_transcription=buffer_transcription.text.strip(),
buffer_diarization=buffer_diarization,
buffer_transcription=buffer_transcription_text,
buffer_diarization=buffer_diarization_text,
buffer_translation=buffer_translation_text,
remaining_time_transcription=state.remaining_time_transcription,
remaining_time_diarization=state.remaining_time_diarization if self.args.diarization else 0
)
should_push = (response != self.last_response_content)
if should_push and (lines or buffer_transcription or buffer_diarization or response_status == "no_audio_detected"):
if should_push:
yield response
self.last_response_content = response
@@ -474,17 +434,17 @@ class AudioProcessor:
logger.warning(f"Exception in results_formatter. Traceback: {traceback.format_exc()}")
await asyncio.sleep(0.5)
async def create_tasks(self):
async def create_tasks(self) -> AsyncGenerator[FrontData, None]:
"""Create and start processing tasks."""
self.all_tasks_for_cleanup = []
processing_tasks_for_watchdog = []
processing_tasks_for_watchdog: List[asyncio.Task] = []
# If using FFmpeg (non-PCM input), start it and spawn stdout reader
if not self.is_pcm_input:
success = await self.ffmpeg_manager.start()
if not success:
logger.error("Failed to start FFmpeg manager")
async def error_generator():
async def error_generator() -> AsyncGenerator[FrontData, None]:
yield FrontData(
status="error",
error="FFmpeg failed to start. Please check that FFmpeg is installed."
@@ -500,7 +460,7 @@ class AudioProcessor:
processing_tasks_for_watchdog.append(self.transcription_task)
if self.diarization:
self.diarization_task = asyncio.create_task(self.diarization_processor(self.diarization))
self.diarization_task = asyncio.create_task(self.diarization_processor())
self.all_tasks_for_cleanup.append(self.diarization_task)
processing_tasks_for_watchdog.append(self.diarization_task)
@@ -515,9 +475,9 @@ class AudioProcessor:
return self.results_formatter()
async def watchdog(self, tasks_to_monitor):
async def watchdog(self, tasks_to_monitor: List[asyncio.Task]) -> None:
"""Monitors the health of critical processing tasks."""
tasks_remaining = [task for task in tasks_to_monitor if task]
tasks_remaining: List[asyncio.Task] = [task for task in tasks_to_monitor if task]
while True:
try:
if not tasks_remaining:
@@ -542,7 +502,7 @@ class AudioProcessor:
except Exception as e:
logger.error(f"Error in watchdog task: {e}", exc_info=True)
async def cleanup(self):
async def cleanup(self) -> None:
"""Clean up resources when processing is complete."""
logger.info("Starting cleanup of AudioProcessor resources.")
self.is_stopping = True
@@ -565,7 +525,7 @@ class AudioProcessor:
self.diarization.close()
logger.info("AudioProcessor cleanup complete.")
def _processing_tasks_done(self):
def _processing_tasks_done(self) -> bool:
"""Return True when all active processing tasks have completed."""
tasks_to_check = [
self.transcription_task,
@@ -576,11 +536,13 @@ class AudioProcessor:
return all(task.done() for task in tasks_to_check if task)
async def process_audio(self, message):
async def process_audio(self, message: Optional[bytes]) -> None:
"""Process incoming audio data."""
if not self.state.beg_loop:
self.state.beg_loop = time()
if not self.beg_loop:
self.beg_loop = time()
self.current_silence = Silence(start=0.0, is_starting=True)
self.tokens_alignment.beg_loop = self.beg_loop
if not message:
logger.info("Empty audio message received, initiating stop sequence.")
@@ -613,7 +575,7 @@ class AudioProcessor:
else:
logger.warning("Failed to write audio data to FFmpeg")
async def handle_pcm_data(self):
async def handle_pcm_data(self) -> None:
# Process when enough data
if len(self.pcm_buffer) < self.bytes_per_sec:
return
@@ -632,40 +594,30 @@ class AudioProcessor:
pcm_array = self.convert_pcm_to_float(self.pcm_buffer[:aligned_chunk_size])
self.pcm_buffer = self.pcm_buffer[aligned_chunk_size:]
res = None
end_of_audio = False
silence_buffer = None
num_samples = len(pcm_array)
chunk_sample_start = self.total_pcm_samples
chunk_sample_end = chunk_sample_start + num_samples
res = None
if self.args.vac:
res = self.vac(pcm_array)
if res is not None:
if res.get("end", 0) > res.get("start", 0):
end_of_audio = True
elif self.silence: #end of silence
self.silence = False
silence_buffer = Silence(duration=time() - self.start_silence)
silence_detected = res.get("end", 0) > res.get("start", 0)
if silence_detected and not self.current_silence:
pre_silence_chunk = self._slice_before_silence(
pcm_array, chunk_sample_start, res.get("end")
)
if pre_silence_chunk is not None and pre_silence_chunk.size > 0:
await self._enqueue_active_audio(pre_silence_chunk)
await self._begin_silence()
elif self.current_silence:
await self._end_silence()
if silence_buffer:
if not self.diarization_before_transcription and self.transcription_queue:
await self.transcription_queue.put(silence_buffer)
if self.args.diarization and self.diarization_queue:
await self.diarization_queue.put(silence_buffer)
if self.translation_queue:
await self.translation_queue.put(silence_buffer)
if not self.current_silence:
await self._enqueue_active_audio(pcm_array)
if not self.silence:
if not self.diarization_before_transcription and self.transcription_queue:
await self.transcription_queue.put(pcm_array.copy())
if self.args.diarization and self.diarization_queue:
await self.diarization_queue.put(pcm_array.copy())
self.silence_duration = 0.0
if end_of_audio:
self.silence = True
self.start_silence = time()
self.total_pcm_samples = chunk_sample_end
if not self.args.transcription and not self.args.diarization:
await asyncio.sleep(0.1)

View File

@@ -0,0 +1,41 @@
import importlib.util
import logging
import platform
logger = logging.getLogger(__name__)
def module_available(module_name):
"""Return True if the given module can be imported."""
return importlib.util.find_spec(module_name) is not None
def mlx_backend_available(warn_on_missing = False):
is_macos = platform.system() == "Darwin"
is_arm = platform.machine() == "arm64"
available = (
is_macos
and is_arm
and module_available("mlx_whisper")
)
if not available and warn_on_missing and is_macos and is_arm:
logger.warning(
"=" * 50
+ "\nMLX Whisper not found but you are on Apple Silicon. "
"Consider installing mlx-whisper for better performance: "
"`pip install mlx-whisper`\n"
+ "=" * 50
)
return available
def faster_backend_available(warn_on_missing = False):
available = module_available("faster_whisper")
if not available and warn_on_missing and platform.system() != "Darwin":
logger.warning(
"=" * 50
+ "\nFaster-Whisper not found. Consider installing faster-whisper "
"for better performance: `pip install faster-whisper`\n"
+ "=" * 50
)
return available

View File

@@ -1,11 +1,9 @@
try:
from whisperlivekit.whisper_streaming_custom.whisper_online import backend_factory
from whisperlivekit.whisper_streaming_custom.online_asr import OnlineASRProcessor
except ImportError:
from .whisper_streaming_custom.whisper_online import backend_factory
from .whisper_streaming_custom.online_asr import OnlineASRProcessor
from whisperlivekit.local_agreement.whisper_online import backend_factory
from whisperlivekit.simul_whisper import SimulStreamingASR
from whisperlivekit.local_agreement.online_asr import OnlineASRProcessor
from argparse import Namespace
import sys
import logging
def update_with_kwargs(_dict, kwargs):
_dict.update({
@@ -13,6 +11,9 @@ def update_with_kwargs(_dict, kwargs):
})
return _dict
logger = logging.getLogger(__name__)
class TranscriptionEngine:
_instance = None
_initialized = False
@@ -44,18 +45,20 @@ class TranscriptionEngine:
"pcm_input": False,
"disable_punctuation_split" : False,
"diarization_backend": "sortformer",
"backend_policy": "simulstreaming",
"backend": "auto",
}
global_params = update_with_kwargs(global_params, kwargs)
transcription_common_params = {
"backend": "simulstreaming",
"warmup_file": None,
"min_chunk_size": 0.5,
"model_size": "tiny",
"min_chunk_size": 0.1,
"model_size": "base",
"model_cache_dir": None,
"model_dir": None,
"model_path": None,
"lan": "auto",
"task": "transcribe",
"direct_english_translation": False,
}
transcription_common_params = update_with_kwargs(transcription_common_params, kwargs)
@@ -81,10 +84,9 @@ class TranscriptionEngine:
use_onnx = kwargs.get('vac_onnx', False)
self.vac_model = load_silero_vad(onnx=use_onnx)
backend_policy = self.args.backend_policy
if self.args.transcription:
if self.args.backend == "simulstreaming":
from whisperlivekit.simul_whisper import SimulStreamingASR
if backend_policy == "simulstreaming":
simulstreaming_params = {
"disable_fast_encoder": False,
"custom_alignment_heads": None,
@@ -98,14 +100,19 @@ class TranscriptionEngine:
"init_prompt": None,
"static_init_prompt": None,
"max_context_tokens": None,
"model_path": './base.pt',
"preload_model_count": 1,
}
simulstreaming_params = update_with_kwargs(simulstreaming_params, kwargs)
self.tokenizer = None
self.asr = SimulStreamingASR(
**transcription_common_params, **simulstreaming_params
**transcription_common_params,
**simulstreaming_params,
backend=self.args.backend,
)
logger.info(
"Using SimulStreaming policy with %s backend",
getattr(self.asr, "encoder_backend", "whisper"),
)
else:
@@ -117,7 +124,13 @@ class TranscriptionEngine:
whisperstreaming_params = update_with_kwargs(whisperstreaming_params, kwargs)
self.asr = backend_factory(
**transcription_common_params, **whisperstreaming_params
backend=self.args.backend,
**transcription_common_params,
**whisperstreaming_params,
)
logger.info(
"Using LocalAgreement policy with %s backend",
getattr(self.asr, "backend_choice", self.asr.__class__.__name__),
)
if self.args.diarization:
@@ -138,7 +151,7 @@ class TranscriptionEngine:
self.translation_model = None
if self.args.target_language:
if self.args.lan == 'auto' and self.args.backend != "simulstreaming":
if self.args.lan == 'auto' and backend_policy != "simulstreaming":
raise Exception('Translation cannot be set with language auto when transcription backend is not simulstreaming')
else:
try:
@@ -155,7 +168,7 @@ class TranscriptionEngine:
def online_factory(args, asr):
if args.backend == "simulstreaming":
if args.backend_policy == "simulstreaming":
from whisperlivekit.simul_whisper import SimulStreamingOnlineProcessor
online = SimulStreamingOnlineProcessor(asr)
else:

View File

@@ -26,7 +26,7 @@ class DiarizationObserver(Observer):
"""Observer that logs all data emitted by the diarization pipeline and stores speaker segments."""
def __init__(self):
self.speaker_segments = []
self.diarization_segments = []
self.processed_time = 0
self.segment_lock = threading.Lock()
self.global_time_offset = 0.0
@@ -48,7 +48,7 @@ class DiarizationObserver(Observer):
for speaker, label in annotation._labels.items():
for start, end in zip(label.segments_boundaries_[:-1], label.segments_boundaries_[1:]):
print(f" {speaker}: {start:.2f}s-{end:.2f}s")
self.speaker_segments.append(SpeakerSegment(
self.diarization_segments.append(SpeakerSegment(
speaker=speaker,
start=start + self.global_time_offset,
end=end + self.global_time_offset
@@ -59,14 +59,14 @@ class DiarizationObserver(Observer):
def get_segments(self) -> List[SpeakerSegment]:
"""Get a copy of the current speaker segments."""
with self.segment_lock:
return self.speaker_segments.copy()
return self.diarization_segments.copy()
def clear_old_segments(self, older_than: float = 30.0):
"""Clear segments older than the specified time."""
with self.segment_lock:
current_time = self.processed_time
self.speaker_segments = [
segment for segment in self.speaker_segments
self.diarization_segments = [
segment for segment in self.diarization_segments
if current_time - segment.end < older_than
]
@@ -178,7 +178,6 @@ class DiartDiarization:
self.pipeline = SpeakerDiarization(config=config)
self.observer = DiarizationObserver()
self.lag_diart = None
if use_microphone:
self.source = MicrophoneAudioSource(block_duration=block_duration)
@@ -217,32 +216,6 @@ class DiartDiarization:
if self.custom_source:
self.custom_source.close()
def assign_speakers_to_tokens(self, tokens: list, use_punctuation_split: bool = False) -> float:
"""
Assign speakers to tokens based on timing overlap with speaker segments.
Uses the segments collected by the observer.
If use_punctuation_split is True, uses punctuation marks to refine speaker boundaries.
"""
segments = self.observer.get_segments()
# Debug logging
logger.debug(f"assign_speakers_to_tokens called with {len(tokens)} tokens")
logger.debug(f"Available segments: {len(segments)}")
for i, seg in enumerate(segments[:5]): # Show first 5 segments
logger.debug(f" Segment {i}: {seg.speaker} [{seg.start:.2f}-{seg.end:.2f}]")
if not self.lag_diart and segments and tokens:
self.lag_diart = segments[0].start - tokens[0].start
if not use_punctuation_split:
for token in tokens:
for segment in segments:
if not (segment.end <= token.start + self.lag_diart or segment.start >= token.end + self.lag_diart):
token.speaker = extract_number(segment.speaker) + 1
else:
tokens = add_speaker_to_tokens(segments, tokens)
return tokens
def concatenate_speakers(segments):
segments_concatenated = [{"speaker": 1, "begin": 0.0, "end": 0.0}]

View File

@@ -94,11 +94,11 @@ class SortformerDiarizationOnline:
model_name: Pre-trained model name (default: "nvidia/diar_streaming_sortformer_4spk-v2")
"""
self.sample_rate = sample_rate
self.speaker_segments = []
self.diarization_segments = []
self.diar_segments = []
self.buffer_audio = np.array([], dtype=np.float32)
self.segment_lock = threading.Lock()
self.global_time_offset = 0.0
self.processed_time = 0.0
self.debug = False
self.diar_model = shared_model.diar_model
@@ -155,12 +155,10 @@ class SortformerDiarizationOnline:
)
self.streaming_state.fifo_lengths = torch.zeros((batch_size,), dtype=torch.long, device=device)
self.streaming_state.mean_sil_emb = torch.zeros((batch_size, self.diar_model.sortformer_modules.fc_d_model), device=device)
self.streaming_state.n_sil_frames = torch.zeros((batch_size,), dtype=torch.long, device=device)
# Initialize total predictions tensor
self.streaming_state.n_sil_frames = torch.zeros((batch_size,), dtype=torch.long, device=device)
self.total_preds = torch.zeros((batch_size, 0, self.diar_model.sortformer_modules.n_spk), device=device)
def insert_silence(self, silence_duration: float):
def insert_silence(self, silence_duration: Optional[float]):
"""
Insert silence period by adjusting the global time offset.
@@ -171,248 +169,111 @@ class SortformerDiarizationOnline:
self.global_time_offset += silence_duration
logger.debug(f"Inserted silence of {silence_duration:.2f}s, new offset: {self.global_time_offset:.2f}s")
async def diarize(self, pcm_array: np.ndarray):
def insert_audio_chunk(self, pcm_array: np.ndarray):
if self.debug:
self.audio_buffer.append(pcm_array.copy())
self.buffer_audio = np.concatenate([self.buffer_audio, pcm_array.copy()])
async def diarize(self):
"""
Process audio data for diarization in streaming fashion.
Args:
pcm_array: Audio data as numpy array
"""
try:
if self.debug:
self.audio_buffer.append(pcm_array.copy())
threshold = int(self.chunk_duration_seconds * self.sample_rate)
threshold = int(self.chunk_duration_seconds * self.sample_rate)
if not len(self.buffer_audio) >= threshold:
return []
audio = self.buffer_audio[:threshold]
self.buffer_audio = self.buffer_audio[threshold:]
device = self.diar_model.device
audio_signal_chunk = torch.tensor(audio, device=device).unsqueeze(0)
audio_signal_length_chunk = torch.tensor([audio_signal_chunk.shape[1]], device=device)
processed_signal_chunk, processed_signal_length_chunk = self.audio2mel.get_features(
audio_signal_chunk, audio_signal_length_chunk
)
processed_signal_chunk = processed_signal_chunk.to(device)
processed_signal_length_chunk = processed_signal_length_chunk.to(device)
if self._previous_chunk_features is not None:
to_add = self._previous_chunk_features[:, :, -99:].to(device)
total_features = torch.concat([to_add, processed_signal_chunk], dim=2).to(device)
else:
total_features = processed_signal_chunk.to(device)
self._previous_chunk_features = processed_signal_chunk.to(device)
chunk_feat_seq_t = torch.transpose(total_features, 1, 2).to(device)
with torch.inference_mode():
left_offset = 8 if self._chunk_index > 0 else 0
right_offset = 8
self.buffer_audio = np.concatenate([self.buffer_audio, pcm_array.copy()])
if not len(self.buffer_audio) >= threshold:
return
audio = self.buffer_audio[:threshold]
self.buffer_audio = self.buffer_audio[threshold:]
device = self.diar_model.device
audio_signal_chunk = torch.tensor(audio, device=device).unsqueeze(0)
audio_signal_length_chunk = torch.tensor([audio_signal_chunk.shape[1]], device=device)
processed_signal_chunk, processed_signal_length_chunk = self.audio2mel.get_features(
audio_signal_chunk, audio_signal_length_chunk
)
processed_signal_chunk = processed_signal_chunk.to(device)
processed_signal_length_chunk = processed_signal_length_chunk.to(device)
if self._previous_chunk_features is not None:
to_add = self._previous_chunk_features[:, :, -99:].to(device)
total_features = torch.concat([to_add, processed_signal_chunk], dim=2).to(device)
else:
total_features = processed_signal_chunk.to(device)
self._previous_chunk_features = processed_signal_chunk.to(device)
chunk_feat_seq_t = torch.transpose(total_features, 1, 2).to(device)
with torch.inference_mode():
left_offset = 8 if self._chunk_index > 0 else 0
right_offset = 8
self.streaming_state, self.total_preds = self.diar_model.forward_streaming_step(
processed_signal=chunk_feat_seq_t,
processed_signal_length=torch.tensor([chunk_feat_seq_t.shape[1]]).to(device),
streaming_state=self.streaming_state,
total_preds=self.total_preds,
left_offset=left_offset,
right_offset=right_offset,
)
# Convert predictions to speaker segments
self._process_predictions()
self._chunk_index += 1
except Exception as e:
logger.error(f"Error in diarize: {e}")
raise
# TODO: Handle case when stream ends with partial buffer (accumulated_duration > 0 but < chunk_duration_seconds)
self.streaming_state, self.total_preds = self.diar_model.forward_streaming_step(
processed_signal=chunk_feat_seq_t,
processed_signal_length=torch.tensor([chunk_feat_seq_t.shape[1]]).to(device),
streaming_state=self.streaming_state,
total_preds=self.total_preds,
left_offset=left_offset,
right_offset=right_offset,
)
new_segments = self._process_predictions()
self._chunk_index += 1
return new_segments
def _process_predictions(self):
"""Process model predictions and convert to speaker segments."""
try:
preds_np = self.total_preds[0].cpu().numpy()
active_speakers = np.argmax(preds_np, axis=1)
if self._len_prediction is None:
self._len_prediction = len(active_speakers)
# Get predictions for current chunk
frame_duration = self.chunk_duration_seconds / self._len_prediction
current_chunk_preds = active_speakers[-self._len_prediction:]
with self.segment_lock:
# Process predictions into segments
base_time = self._chunk_index * self.chunk_duration_seconds + self.global_time_offset
for idx, spk in enumerate(current_chunk_preds):
start_time = base_time + idx * frame_duration
end_time = base_time + (idx + 1) * frame_duration
# Check if this continues the last segment or starts a new one
if (self.speaker_segments and
self.speaker_segments[-1].speaker == spk and
abs(self.speaker_segments[-1].end - start_time) < frame_duration * 0.5):
# Continue existing segment
self.speaker_segments[-1].end = end_time
else:
# Create new segment
self.speaker_segments.append(SpeakerSegment(
speaker=spk,
start=start_time,
end=end_time
))
# Update processed time
self.processed_time = max(self.processed_time, base_time + self.chunk_duration_seconds)
logger.debug(f"Processed chunk {self._chunk_index}, total segments: {len(self.speaker_segments)}")
except Exception as e:
logger.error(f"Error processing predictions: {e}")
def assign_speakers_to_tokens(self, tokens: list, use_punctuation_split: bool = False) -> list:
"""
Assign speakers to tokens based on timing overlap with speaker segments.
preds_np = self.total_preds[0].cpu().numpy()
active_speakers = np.argmax(preds_np, axis=1)
Args:
tokens: List of tokens with timing information
use_punctuation_split: Whether to use punctuation for boundary refinement
Returns:
List of tokens with speaker assignments
Last speaker_segment
"""
if self._len_prediction is None:
self._len_prediction = len(active_speakers) #12
frame_duration = self.chunk_duration_seconds / self._len_prediction
current_chunk_preds = active_speakers[-self._len_prediction:]
new_segments = []
with self.segment_lock:
segments = self.speaker_segments.copy()
if not segments or not tokens:
logger.debug("No segments or tokens available for speaker assignment")
return tokens
logger.debug(f"Assigning speakers to {len(tokens)} tokens using {len(segments)} segments")
use_punctuation_split = False
if not use_punctuation_split:
# Simple overlap-based assignment
for token in tokens:
token.speaker = -1 # Default to no speaker
for segment in segments:
# Check for timing overlap
if not (segment.end <= token.start or segment.start >= token.end):
token.speaker = segment.speaker + 1 # Convert to 1-based indexing
break
else:
# Use punctuation-aware assignment (similar to diart_backend)
tokens = self._add_speaker_to_tokens_with_punctuation(segments, tokens)
return tokens
def _add_speaker_to_tokens_with_punctuation(self, segments: List[SpeakerSegment], tokens: list) -> list:
"""
Assign speakers to tokens with punctuation-aware boundary adjustment.
Args:
segments: List of speaker segments
tokens: List of tokens to assign speakers to
Returns:
List of tokens with speaker assignments
"""
punctuation_marks = {'.', '!', '?'}
punctuation_tokens = [token for token in tokens if token.text.strip() in punctuation_marks]
# Convert segments to concatenated format
segments_concatenated = self._concatenate_speakers(segments)
# Adjust segment boundaries based on punctuation
for ind, segment in enumerate(segments_concatenated):
for i, punctuation_token in enumerate(punctuation_tokens):
if punctuation_token.start > segment['end']:
after_length = punctuation_token.start - segment['end']
before_length = segment['end'] - punctuation_tokens[i - 1].end if i > 0 else float('inf')
if before_length > after_length:
segment['end'] = punctuation_token.start
if i < len(punctuation_tokens) - 1 and ind + 1 < len(segments_concatenated):
segments_concatenated[ind + 1]['begin'] = punctuation_token.start
else:
segment['end'] = punctuation_tokens[i - 1].end if i > 0 else segment['end']
if i < len(punctuation_tokens) - 1 and ind - 1 >= 0:
segments_concatenated[ind - 1]['begin'] = punctuation_tokens[i - 1].end
break
# Ensure non-overlapping tokens
last_end = 0.0
for token in tokens:
start = max(last_end + 0.01, token.start)
token.start = start
token.end = max(start, token.end)
last_end = token.end
# Assign speakers based on adjusted segments
ind_last_speaker = 0
for segment in segments_concatenated:
for i, token in enumerate(tokens[ind_last_speaker:]):
if token.end <= segment['end']:
token.speaker = segment['speaker']
ind_last_speaker = i + 1
elif token.start > segment['end']:
break
return tokens
def _concatenate_speakers(self, segments: List[SpeakerSegment]) -> List[dict]:
"""
Concatenate consecutive segments from the same speaker.
Args:
segments: List of speaker segments
Returns:
List of concatenated speaker segments
"""
if not segments:
return []
segments_concatenated = [{"speaker": segments[0].speaker + 1, "begin": segments[0].start, "end": segments[0].end}]
for segment in segments[1:]:
speaker = segment.speaker + 1
if segments_concatenated[-1]['speaker'] != speaker:
segments_concatenated.append({"speaker": speaker, "begin": segment.start, "end": segment.end})
else:
segments_concatenated[-1]['end'] = segment.end
base_time = self._chunk_index * self.chunk_duration_seconds + self.global_time_offset
current_spk = current_chunk_preds[0]
start_time = round(base_time, 2)
for idx, spk in enumerate(current_chunk_preds):
current_time = round(base_time + idx * frame_duration, 2)
if spk != current_spk:
new_segments.append(SpeakerSegment(
speaker=current_spk,
start=start_time,
end=current_time
))
start_time = current_time
current_spk = spk
new_segments.append(
SpeakerSegment(
speaker=current_spk,
start=start_time,
end=current_time
)
)
return new_segments
return segments_concatenated
def get_segments(self) -> List[SpeakerSegment]:
"""Get a copy of the current speaker segments."""
with self.segment_lock:
return self.speaker_segments.copy()
def clear_old_segments(self, older_than: float = 30.0):
"""Clear segments older than the specified time."""
with self.segment_lock:
current_time = self.processed_time
self.speaker_segments = [
segment for segment in self.speaker_segments
if current_time - segment.end < older_than
]
logger.debug(f"Cleared old segments, remaining: {len(self.speaker_segments)}")
return self.diarization_segments.copy()
def close(self):
"""Close the diarization system and clean up resources."""
logger.info("Closing SortformerDiarization")
with self.segment_lock:
self.speaker_segments.clear()
self.diarization_segments.clear()
if self.debug:
concatenated_audio = np.concatenate(self.audio_buffer)
@@ -438,7 +299,7 @@ if __name__ == '__main__':
async def main():
"""TEST ONLY."""
an4_audio = 'audio_test.mp3'
an4_audio = 'diarization_audio.wav'
signal, sr = librosa.load(an4_audio, sr=16000)
signal = signal[:16000*30]
@@ -450,13 +311,15 @@ if __name__ == '__main__':
print("Speaker 0: 0:25 - 0:30")
print("=" * 50)
diarization = SortformerDiarization(sample_rate=16000)
diarization_backend = SortformerDiarization()
diarization = SortformerDiarizationOnline(shared_model = diarization_backend)
chunk_size = 1600
for i in range(0, len(signal), chunk_size):
chunk = signal[i:i+chunk_size]
await diarization.diarize(chunk)
new_segments = await diarization.diarize(chunk)
print(f"Processed chunk {i // chunk_size + 1}")
print(new_segments)
segments = diarization.get_segments()
print("\nDiarization results:")

View File

@@ -1,205 +0,0 @@
import numpy as np
import torch
import logging
from nemo.collections.asr.models import SortformerEncLabelModel
from nemo.collections.asr.modules import AudioToMelSpectrogramPreprocessor
import librosa
logger = logging.getLogger(__name__)
def load_model():
diar_model = SortformerEncLabelModel.from_pretrained("nvidia/diar_streaming_sortformer_4spk-v2")
diar_model.eval()
if torch.cuda.is_available():
diar_model.to(torch.device("cuda"))
#we target 1 second lag for the moment. chunk_len could be reduced.
diar_model.sortformer_modules.chunk_len = 10
diar_model.sortformer_modules.subsampling_factor = 10 #8 would be better ideally
diar_model.sortformer_modules.chunk_right_context = 0 #no.
diar_model.sortformer_modules.chunk_left_context = 10 #big so it compensiate the problem with no padding later.
diar_model.sortformer_modules.spkcache_len = 188
diar_model.sortformer_modules.fifo_len = 188
diar_model.sortformer_modules.spkcache_update_period = 144
diar_model.sortformer_modules.log = False
diar_model.sortformer_modules._check_streaming_parameters()
audio2mel = AudioToMelSpectrogramPreprocessor(
window_size= 0.025,
normalize="NA",
n_fft=512,
features=128,
pad_to=0) #pad_to 16 works better than 0. On test audio, we detect a third speaker for 1 second with pad_to=0. To solve that : increase left context to 10.
return diar_model, audio2mel
diar_model, audio2mel = load_model()
class StreamingSortformerState:
"""
This class creates a class instance that will be used to store the state of the
streaming Sortformer model.
Attributes:
spkcache (torch.Tensor): Speaker cache to store embeddings from start
spkcache_lengths (torch.Tensor): Lengths of the speaker cache
spkcache_preds (torch.Tensor): The speaker predictions for the speaker cache parts
fifo (torch.Tensor): FIFO queue to save the embedding from the latest chunks
fifo_lengths (torch.Tensor): Lengths of the FIFO queue
fifo_preds (torch.Tensor): The speaker predictions for the FIFO queue parts
spk_perm (torch.Tensor): Speaker permutation information for the speaker cache
mean_sil_emb (torch.Tensor): Mean silence embedding
n_sil_frames (torch.Tensor): Number of silence frames
"""
spkcache = None # Speaker cache to store embeddings from start
spkcache_lengths = None #
spkcache_preds = None # speaker cache predictions
fifo = None # to save the embedding from the latest chunks
fifo_lengths = None
fifo_preds = None
spk_perm = None
mean_sil_emb = None
n_sil_frames = None
def init_streaming_state(self, batch_size: int = 1, async_streaming: bool = False, device: torch.device = None):
"""
Initializes StreamingSortformerState with empty tensors or zero-valued tensors.
Args:
batch_size (int): Batch size for tensors in streaming state
async_streaming (bool): True for asynchronous update, False for synchronous update
device (torch.device): Device for tensors in streaming state
Returns:
streaming_state (SortformerStreamingState): initialized streaming state
"""
streaming_state = StreamingSortformerState()
if async_streaming:
streaming_state.spkcache = torch.zeros((batch_size, self.spkcache_len, self.fc_d_model), device=device)
streaming_state.spkcache_preds = torch.zeros((batch_size, self.spkcache_len, self.n_spk), device=device)
streaming_state.spkcache_lengths = torch.zeros((batch_size,), dtype=torch.long, device=device)
streaming_state.fifo = torch.zeros((batch_size, self.fifo_len, self.fc_d_model), device=device)
streaming_state.fifo_lengths = torch.zeros((batch_size,), dtype=torch.long, device=device)
else:
streaming_state.spkcache = torch.zeros((batch_size, 0, self.fc_d_model), device=device)
streaming_state.fifo = torch.zeros((batch_size, 0, self.fc_d_model), device=device)
streaming_state.mean_sil_emb = torch.zeros((batch_size, self.fc_d_model), device=device)
streaming_state.n_sil_frames = torch.zeros((batch_size,), dtype=torch.long, device=device)
return streaming_state
def process_diarization(chunks):
"""
what it does:
1. Preprocessing: Applies dithering and pre-emphasis (high-pass filter) if enabled
2. STFT: Computes the Short-Time Fourier Transform using:
- the window of window_size=0.025 --> size of a window : 400 samples
- the hop parameter : n_window_stride = 0.01 -> every 160 samples, a new window
3. Magnitude Calculation: Converts complex STFT output to magnitude spectrogram
4. Mel Conversion: Applies Mel filterbanks (128 filters in this case) to get Mel spectrogram
5. Logarithm: Takes the log of the Mel spectrogram (if `log=True`)
6. Normalization: Skips normalization since `normalize="NA"`
7. Padding: Pads the time dimension to a multiple of `pad_to` (default 16)
"""
previous_chunk = None
l_chunk_feat_seq_t = []
for chunk in chunks:
audio_signal_chunk = torch.tensor(chunk).unsqueeze(0).to(diar_model.device)
audio_signal_length_chunk = torch.tensor([audio_signal_chunk.shape[1]]).to(diar_model.device)
processed_signal_chunk, processed_signal_length_chunk = audio2mel.get_features(audio_signal_chunk, audio_signal_length_chunk)
if previous_chunk is not None:
to_add = previous_chunk[:, :, -99:]
total = torch.concat([to_add, processed_signal_chunk], dim=2)
else:
total = processed_signal_chunk
previous_chunk = processed_signal_chunk
l_chunk_feat_seq_t.append(torch.transpose(total, 1, 2))
batch_size = 1
streaming_state = init_streaming_state(diar_model.sortformer_modules,
batch_size = batch_size,
async_streaming = True,
device = diar_model.device
)
total_preds = torch.zeros((batch_size, 0, diar_model.sortformer_modules.n_spk), device=diar_model.device)
chunk_duration_seconds = diar_model.sortformer_modules.chunk_len * diar_model.sortformer_modules.subsampling_factor * diar_model.preprocessor._cfg.window_stride
l_speakers = [
{'start_time': 0,
'end_time': 0,
'speaker': 0
}
]
len_prediction = None
left_offset = 0
right_offset = 8
for i, chunk_feat_seq_t in enumerate(l_chunk_feat_seq_t):
with torch.inference_mode():
streaming_state, total_preds = diar_model.forward_streaming_step(
processed_signal=chunk_feat_seq_t,
processed_signal_length=torch.tensor([chunk_feat_seq_t.shape[1]]),
streaming_state=streaming_state,
total_preds=total_preds,
left_offset=left_offset,
right_offset=right_offset,
)
left_offset = 8
preds_np = total_preds[0].cpu().numpy()
active_speakers = np.argmax(preds_np, axis=1)
if len_prediction is None:
len_prediction = len(active_speakers) # we want to get the len of 1 prediction
frame_duration = chunk_duration_seconds / len_prediction
active_speakers = active_speakers[-len_prediction:]
for idx, spk in enumerate(active_speakers):
if spk != l_speakers[-1]['speaker']:
l_speakers.append(
{'start_time': (i * chunk_duration_seconds + idx * frame_duration),
'end_time': (i * chunk_duration_seconds + (idx + 1) * frame_duration),
'speaker': spk
})
else:
l_speakers[-1]['end_time'] = i * chunk_duration_seconds + (idx + 1) * frame_duration
"""
Should print
[{'start_time': 0, 'end_time': 8.72, 'speaker': 0},
{'start_time': 8.72, 'end_time': 18.88, 'speaker': 1},
{'start_time': 18.88, 'end_time': 24.96, 'speaker': 2},
{'start_time': 24.96, 'end_time': 31.68, 'speaker': 0}]
"""
for speaker in l_speakers:
print(f"Speaker {speaker['speaker']}: {speaker['start_time']:.2f}s - {speaker['end_time']:.2f}s")
if __name__ == '__main__':
an4_audio = 'audio_test.mp3'
signal, sr = librosa.load(an4_audio, sr=16000)
signal = signal[:16000*30]
# signal = signal[:-(len(signal)%16000)]
print("\n" + "=" * 50)
print("Expected ground truth:")
print("Speaker 0: 0:00 - 0:09")
print("Speaker 1: 0:09 - 0:19")
print("Speaker 2: 0:19 - 0:25")
print("Speaker 0: 0:25 - 0:30")
print("=" * 50)
chunk_size = 16000 # 1 second
chunks = []
for i in range(0, len(signal), chunk_size):
chunk = signal[i:i+chunk_size]
chunks.append(chunk)
process_diarization(chunks)

View File

@@ -6,6 +6,8 @@ import math
from typing import List
import numpy as np
from whisperlivekit.timed_objects import ASRToken
from whisperlivekit.model_paths import resolve_model_path, model_path_and_type
from whisperlivekit.whisper.transcribe import transcribe as whisper_transcribe
logger = logging.getLogger(__name__)
class ASRBase:
sep = " " # join transcribe words with this character (" " for whisper_timestamped,
@@ -37,40 +39,60 @@ class ASRBase:
raise NotImplementedError("must be implemented in the child class")
class WhisperTimestampedASR(ASRBase):
"""Uses whisper_timestamped as the backend."""
class WhisperASR(ASRBase):
"""Uses WhisperLiveKit's built-in Whisper implementation."""
sep = " "
def load_model(self, model_size=None, cache_dir=None, model_dir=None):
import whisper
import whisper_timestamped
from whisper_timestamped import transcribe_timestamped
from whisperlivekit.whisper import load_model as load_model
self.transcribe_timestamped = transcribe_timestamped
if model_dir is not None:
logger.debug("ignoring model_dir, not implemented")
return whisper.load_model(model_size, download_root=cache_dir)
resolved_path = resolve_model_path(model_dir)
if resolved_path.is_dir():
pytorch_path, _, _ = model_path_and_type(resolved_path)
if pytorch_path is None:
raise FileNotFoundError(
f"No supported PyTorch checkpoint found under {resolved_path}"
)
resolved_path = pytorch_path
logger.debug(f"Loading Whisper model from custom path {resolved_path}")
return load_model(str(resolved_path))
if model_size is None:
raise ValueError("Either model_size or model_dir must be set for WhisperASR")
return load_model(model_size, download_root=cache_dir)
def transcribe(self, audio, init_prompt=""):
result = self.transcribe_timestamped(
options = dict(self.transcribe_kargs)
options.pop("vad", None)
options.pop("vad_filter", None)
language = self.original_language if self.original_language else None
result = whisper_transcribe(
self.model,
audio,
language=self.original_language,
language=language,
initial_prompt=init_prompt,
verbose=None,
condition_on_previous_text=True,
**self.transcribe_kargs,
word_timestamps=True,
**options,
)
return result
def ts_words(self, r) -> List[ASRToken]:
"""
Converts the whisper_timestamped result to a list of ASRToken objects.
Converts the Whisper result to a list of ASRToken objects.
"""
tokens = []
for segment in r["segments"]:
for word in segment["words"]:
token = ASRToken(word["start"], word["end"], word["text"])
token = ASRToken(
word["start"],
word["end"],
word["word"],
probability=word.get("probability"),
)
tokens.append(token)
return tokens
@@ -78,11 +100,7 @@ class WhisperTimestampedASR(ASRBase):
return [segment["end"] for segment in res["segments"]]
def use_vad(self):
self.transcribe_kargs["vad"] = True
def set_translate_task(self):
self.transcribe_kargs["task"] = "translate"
logger.warning("VAD is not currently supported for WhisperASR backend and will be ignored.")
class FasterWhisperASR(ASRBase):
"""Uses faster-whisper as the backend."""
@@ -92,9 +110,10 @@ class FasterWhisperASR(ASRBase):
from faster_whisper import WhisperModel
if model_dir is not None:
logger.debug(f"Loading whisper model from model_dir {model_dir}. "
resolved_path = resolve_model_path(model_dir)
logger.debug(f"Loading faster-whisper model from {resolved_path}. "
f"model_size and cache_dir parameters are not used.")
model_size_or_path = model_dir
model_size_or_path = str(resolved_path)
elif model_size is not None:
model_size_or_path = model_size
else:
@@ -139,10 +158,6 @@ class FasterWhisperASR(ASRBase):
def use_vad(self):
self.transcribe_kargs["vad_filter"] = True
def set_translate_task(self):
self.transcribe_kargs["task"] = "translate"
class MLXWhisper(ASRBase):
"""
Uses MLX Whisper optimized for Apple Silicon.
@@ -154,8 +169,9 @@ class MLXWhisper(ASRBase):
import mlx.core as mx
if model_dir is not None:
logger.debug(f"Loading whisper model from model_dir {model_dir}. model_size parameter is not used.")
model_size_or_path = model_dir
resolved_path = resolve_model_path(model_dir)
logger.debug(f"Loading MLX Whisper model from {resolved_path}. model_size parameter is not used.")
model_size_or_path = str(resolved_path)
elif model_size is not None:
model_size_or_path = self.translate_model_name(model_size)
logger.debug(f"Loading whisper model {model_size}. You use mlx whisper, so {model_size_or_path} will be used.")
@@ -208,7 +224,8 @@ class MLXWhisper(ASRBase):
if segment.get("no_speech_prob", 0) > 0.9:
continue
for word in segment.get("words", []):
token = ASRToken(word["start"], word["end"], word["word"], probability=word["probability"])
probability=word["probability"]
token = ASRToken(word["start"], word["end"], word["word"])
tokens.append(token)
return tokens
@@ -218,10 +235,6 @@ class MLXWhisper(ASRBase):
def use_vad(self):
self.transcribe_kargs["vad_filter"] = True
def set_translate_task(self):
self.transcribe_kargs["task"] = "translate"
class OpenaiApiASR(ASRBase):
"""Uses OpenAI's Whisper API for transcription."""
def __init__(self, lan=None, temperature=0, logfile=sys.stderr):
@@ -232,7 +245,7 @@ class OpenaiApiASR(ASRBase):
self.temperature = temperature
self.load_model()
self.use_vad_opt = False
self.task = "transcribe"
self.direct_english_translation = False
def load_model(self, *args, **kwargs):
from openai import OpenAI
@@ -274,7 +287,7 @@ class OpenaiApiASR(ASRBase):
"temperature": self.temperature,
"timestamp_granularities": ["word", "segment"],
}
if self.task != "translate" and self.original_language:
if not self.direct_english_translation and self.original_language:
params["language"] = self.original_language
if prompt:
params["prompt"] = prompt
@@ -285,6 +298,3 @@ class OpenaiApiASR(ASRBase):
def use_vad(self):
self.use_vad_opt = True
def set_translate_task(self):
self.task = "translate"

View File

@@ -151,21 +151,32 @@ class OnlineASRProcessor:
"""Append an audio chunk (a numpy array) to the current audio buffer."""
self.audio_buffer = np.append(self.audio_buffer, audio)
def insert_silence(self, silence_duration, offset):
"""
If silences are > 5s, we do a complete context clear. Otherwise, we just insert a small silence and shift the last_attend_frame
"""
# if self.transcript_buffer.buffer:
# self.committed.extend(self.transcript_buffer.buffer)
# self.transcript_buffer.buffer = []
if True: #silence_duration < 3: #we want the last audio to be treated to not have a gap. could also be handled in the future in ends_with_silence.
gap_silence = np.zeros(int(16000 * silence_duration), dtype=np.int16)
self.insert_audio_chunk(gap_silence)
def start_silence(self):
if self.audio_buffer.size == 0:
return [], self.get_audio_buffer_end_time()
return self.process_iter()
def end_silence(self, silence_duration: Optional[float], offset: float):
if not silence_duration or silence_duration <= 0:
return
long_silence = silence_duration >= 5
if not long_silence:
gap_samples = int(self.SAMPLING_RATE * silence_duration)
if gap_samples > 0:
gap_silence = np.zeros(gap_samples, dtype=np.float32)
self.insert_audio_chunk(gap_silence)
else:
self.init(offset=silence_duration + offset)
self.global_time_offset += silence_duration
def insert_silence(self, silence_duration, offset):
"""
Backwards compatibility shim for legacy callers that still use insert_silence.
"""
self.end_silence(silence_duration, offset)
def prompt(self) -> Tuple[str, str]:
"""
Returns a tuple: (prompt, context), where:
@@ -400,11 +411,11 @@ class OnlineASRProcessor:
) -> Transcript:
sep = sep if sep is not None else self.asr.sep
text = sep.join(token.text for token in tokens)
probability = sum(token.probability for token in tokens if token.probability) / len(tokens) if tokens else None
# probability = sum(token.probability for token in tokens if token.probability) / len(tokens) if tokens else None
if tokens:
start = offset + tokens[0].start
end = offset + tokens[-1].end
else:
start = None
end = None
return Transcript(start, end, text, probability=probability)
return Transcript(start, end, text)

View File

@@ -0,0 +1,199 @@
#!/usr/bin/env python3
import sys
import numpy as np
import librosa
from functools import lru_cache
import time
import logging
import platform
from .backends import FasterWhisperASR, MLXWhisper, WhisperASR, OpenaiApiASR
from whisperlivekit.warmup import warmup_asr
from whisperlivekit.model_paths import resolve_model_path, model_path_and_type
from whisperlivekit.backend_support import (
mlx_backend_available,
faster_backend_available,
)
logger = logging.getLogger(__name__)
WHISPER_LANG_CODES = "af,am,ar,as,az,ba,be,bg,bn,bo,br,bs,ca,cs,cy,da,de,el,en,es,et,eu,fa,fi,fo,fr,gl,gu,ha,haw,he,hi,hr,ht,hu,hy,id,is,it,ja,jw,ka,kk,km,kn,ko,la,lb,ln,lo,lt,lv,mg,mi,mk,ml,mn,mr,ms,mt,my,ne,nl,nn,no,oc,pa,pl,ps,pt,ro,ru,sa,sd,si,sk,sl,sn,so,sq,sr,su,sv,sw,ta,te,tg,th,tk,tl,tr,tt,uk,ur,uz,vi,yi,yo,zh".split(
","
)
def create_tokenizer(lan):
"""returns an object that has split function that works like the one of MosesTokenizer"""
assert (
lan in WHISPER_LANG_CODES
), "language must be Whisper's supported lang code: " + " ".join(WHISPER_LANG_CODES)
if lan == "uk":
import tokenize_uk
class UkrainianTokenizer:
def split(self, text):
return tokenize_uk.tokenize_sents(text)
return UkrainianTokenizer()
# supported by fast-mosestokenizer
if (
lan
in "as bn ca cs de el en es et fi fr ga gu hi hu is it kn lt lv ml mni mr nl or pa pl pt ro ru sk sl sv ta te yue zh".split()
):
from mosestokenizer import MosesSentenceSplitter
return MosesSentenceSplitter(lan)
# the following languages are in Whisper, but not in wtpsplit:
if (
lan
in "as ba bo br bs fo haw hr ht jw lb ln lo mi nn oc sa sd sn so su sw tk tl tt".split()
):
logger.debug(
f"{lan} code is not supported by wtpsplit. Going to use None lang_code option."
)
lan = None
from wtpsplit import WtP
# downloads the model from huggingface on the first use
wtp = WtP("wtp-canine-s-12l-no-adapters")
class WtPtok:
def split(self, sent):
return wtp.split(sent, lang_code=lan)
return WtPtok()
def backend_factory(
backend,
lan,
model_size,
model_cache_dir,
model_dir,
model_path,
direct_english_translation,
buffer_trimming,
buffer_trimming_sec,
confidence_validation,
warmup_file=None,
min_chunk_size=None,
):
backend_choice = backend
custom_reference = model_path or model_dir
resolved_root = None
pytorch_checkpoint = None
has_mlx_weights = False
has_fw_weights = False
if custom_reference:
resolved_root = resolve_model_path(custom_reference)
if resolved_root.is_dir():
pytorch_checkpoint, has_mlx_weights, has_fw_weights = model_path_and_type(resolved_root)
else:
pytorch_checkpoint = resolved_root
if backend_choice == "openai-api":
logger.debug("Using OpenAI API.")
asr = OpenaiApiASR(lan=lan)
else:
backend_choice = _normalize_backend_choice(
backend_choice,
resolved_root,
has_mlx_weights,
has_fw_weights,
)
if backend_choice == "faster-whisper":
asr_cls = FasterWhisperASR
if resolved_root is not None and not resolved_root.is_dir():
raise ValueError("Faster-Whisper backend expects a directory with CTranslate2 weights.")
model_override = str(resolved_root) if resolved_root is not None else None
elif backend_choice == "mlx-whisper":
asr_cls = MLXWhisper
if resolved_root is not None and not resolved_root.is_dir():
raise ValueError("MLX Whisper backend expects a directory containing MLX weights.")
model_override = str(resolved_root) if resolved_root is not None else None
else:
asr_cls = WhisperASR
model_override = str(pytorch_checkpoint) if pytorch_checkpoint is not None else None
if custom_reference and model_override is None:
raise FileNotFoundError(
f"No PyTorch checkpoint found under {resolved_root or custom_reference}"
)
t = time.time()
logger.info(f"Loading Whisper {model_size} model for language {lan} using backend {backend_choice}...")
asr = asr_cls(
model_size=model_size,
lan=lan,
cache_dir=model_cache_dir,
model_dir=model_override,
)
e = time.time()
logger.info(f"done. It took {round(e-t,2)} seconds.")
if direct_english_translation:
tgt_language = "en" # Whisper translates into English
else:
tgt_language = lan # Whisper transcribes in this language
# Create the tokenizer
if buffer_trimming == "sentence":
tokenizer = create_tokenizer(tgt_language)
else:
tokenizer = None
warmup_asr(asr, warmup_file)
asr.confidence_validation = confidence_validation
asr.tokenizer = tokenizer
asr.buffer_trimming = buffer_trimming
asr.buffer_trimming_sec = buffer_trimming_sec
asr.backend_choice = backend_choice
return asr
def _normalize_backend_choice(
preferred_backend,
resolved_root,
has_mlx_weights,
has_fw_weights,
):
backend_choice = preferred_backend
if backend_choice == "auto":
if mlx_backend_available(warn_on_missing=True) and (resolved_root is None or has_mlx_weights):
return "mlx-whisper"
if faster_backend_available(warn_on_missing=True) and (resolved_root is None or has_fw_weights):
return "faster-whisper"
return "whisper"
if backend_choice == "mlx-whisper":
if not mlx_backend_available():
raise RuntimeError("mlx-whisper backend requested but mlx-whisper is not installed.")
if resolved_root is not None and not has_mlx_weights:
raise FileNotFoundError(
f"mlx-whisper backend requested but no MLX weights were found under {resolved_root}"
)
if platform.system() != "Darwin":
logger.warning("mlx-whisper backend requested on a non-macOS system; this may fail.")
return backend_choice
if backend_choice == "faster-whisper":
if not faster_backend_available():
raise RuntimeError("faster-whisper backend requested but faster-whisper is not installed.")
if resolved_root is not None and not has_fw_weights:
raise FileNotFoundError(
f"faster-whisper backend requested but no Faster-Whisper weights were found under {resolved_root}"
)
return backend_choice
if backend_choice == "whisper":
return backend_choice
raise ValueError(f"Unknown backend '{preferred_backend}' for LocalAgreement.")

View File

@@ -0,0 +1,69 @@
from pathlib import Path
from typing import Optional, Tuple, Union
def model_path_and_type(model_path: Union[str, Path]) -> Tuple[Optional[Path], bool, bool]:
"""
Inspect the provided path and determine which model formats are available.
Returns:
pytorch_path: Path to a PyTorch checkpoint (if present).
compatible_whisper_mlx: True if MLX weights exist in this folder.
compatible_faster_whisper: True if Faster-Whisper (ctranslate2) weights exist.
"""
path = Path(model_path)
compatible_whisper_mlx = False
compatible_faster_whisper = False
pytorch_path: Optional[Path] = None
if path.is_file() and path.suffix.lower() in [".pt", ".safetensors", ".bin"]:
pytorch_path = path
return pytorch_path, compatible_whisper_mlx, compatible_faster_whisper
if path.is_dir():
for file in path.iterdir():
if not file.is_file():
continue
filename = file.name.lower()
suffix = file.suffix.lower()
if filename in {"weights.npz", "weights.safetensors"}:
compatible_whisper_mlx = True
elif filename in {"model.bin", "encoder.bin", "decoder.bin"}:
compatible_faster_whisper = True
elif suffix in {".pt", ".safetensors"}:
pytorch_path = file
elif filename == "pytorch_model.bin":
pytorch_path = file
if pytorch_path is None:
fallback = path / "pytorch_model.bin"
if fallback.exists():
pytorch_path = fallback
return pytorch_path, compatible_whisper_mlx, compatible_faster_whisper
def resolve_model_path(model_path: Union[str, Path]) -> Path:
"""
Return a local path for the provided model reference.
If the path does not exist locally, it is treated as a Hugging Face repo id
and downloaded via snapshot_download.
"""
path = Path(model_path).expanduser()
if path.exists():
return path
try:
from huggingface_hub import snapshot_download
except ImportError as exc: # pragma: no cover - optional dependency guard
raise FileNotFoundError(
f"Model path '{model_path}' does not exist locally and huggingface_hub "
"is not installed to download it."
) from exc
downloaded_path = Path(snapshot_download(repo_id=str(model_path)))
return downloaded_path

View File

@@ -81,14 +81,14 @@ def parse_args():
parser.add_argument(
"--min-chunk-size",
type=float,
default=0.5,
default=0.1,
help="Minimum audio chunk size in seconds. It waits up to this time to do processing. If the processing takes shorter time, it waits, otherwise it processes the whole segment that was received by this time.",
)
parser.add_argument(
"--model",
type=str,
default="small",
default="base",
dest='model_size',
help="Name size of the Whisper model to use (default: tiny). Suggested values: tiny.en,tiny,base.en,base,small.en,small,medium.en,medium,large-v1,large-v2,large-v3,large,large-v3-turbo. The model is automatically downloaded from the model hub if not present in model cache dir.",
)
@@ -114,11 +114,10 @@ def parse_args():
help="Source language code, e.g. en,de,cs, or 'auto' for language detection.",
)
parser.add_argument(
"--task",
type=str,
default="transcribe",
choices=["transcribe", "translate"],
help="Transcribe or translate.",
"--direct-english-translation",
action="store_true",
default=False,
help="Use Whisper to directly translate to english.",
)
parser.add_argument(
@@ -130,11 +129,18 @@ def parse_args():
)
parser.add_argument(
"--backend",
"--backend-policy",
type=str,
default="simulstreaming",
choices=["faster-whisper", "whisper_timestamped", "mlx-whisper", "openai-api", "simulstreaming"],
help="Load only this backend for Whisper processing.",
choices=["1", "2", "simulstreaming", "localagreement"],
help="Select the streaming policy: 1 or 'simulstreaming' for AlignAtt, 2 or 'localagreement' for LocalAgreement.",
)
parser.add_argument(
"--backend",
type=str,
default="auto",
choices=["auto", "mlx-whisper", "faster-whisper", "whisper", "openai-api"],
help="Select the Whisper backend implementation (auto: prefer MLX on macOS, otherwise Faster-Whisper, else Whisper). Use 'openai-api' with --backend-policy localagreement to call OpenAI's API.",
)
parser.add_argument(
"--no-vac",
@@ -317,5 +323,10 @@ def parse_args():
args.vad = not args.no_vad
delattr(args, 'no_transcription')
delattr(args, 'no_vad')
if args.backend_policy == "1":
args.backend_policy = "simulstreaming"
elif args.backend_policy == "2":
args.backend_policy = "localagreement"
return args

View File

@@ -1,106 +0,0 @@
from whisperlivekit.timed_objects import ASRToken
from time import time
import re
MIN_SILENCE_DURATION = 4 #in seconds
END_SILENCE_DURATION = 8 #in seconds. you should keep it important to not have false positive when the model lag is important
END_SILENCE_DURATION_VAC = 3 #VAC is good at detecting silences, but we want to skip the smallest silences
def blank_to_silence(tokens):
full_string = ''.join([t.text for t in tokens])
patterns = [re.compile(r'(?:\s*\[BLANK_AUDIO\]\s*)+'), re.compile(r'(?:\s*\[typing\]\s*)+')]
matches = []
for pattern in patterns:
for m in pattern.finditer(full_string):
matches.append({
'start': m.start(),
'end': m.end()
})
if matches:
# cleaned = pattern.sub(' ', full_string).strip()
# print("Cleaned:", cleaned)
cumulated_len = 0
silence_token = None
cleaned_tokens = []
for token in tokens:
if matches:
start = cumulated_len
end = cumulated_len + len(token.text)
cumulated_len = end
if start >= matches[0]['start'] and end <= matches[0]['end']:
if silence_token: #previous token was already silence
silence_token.start = min(silence_token.start, token.start)
silence_token.end = max(silence_token.end, token.end)
else: #new silence
silence_token = ASRToken(
start=token.start,
end=token.end,
speaker=-2,
probability=0.95
)
else:
if silence_token: #there was silence but no more
if silence_token.duration() >= MIN_SILENCE_DURATION:
cleaned_tokens.append(
silence_token
)
silence_token = None
matches.pop(0)
cleaned_tokens.append(token)
# print(cleaned_tokens)
return cleaned_tokens
return tokens
def no_token_to_silence(tokens):
new_tokens = []
silence_token = None
for token in tokens:
if token.speaker == -2:
if new_tokens and new_tokens[-1].speaker == -2: #if token is silence and previous one too
new_tokens[-1].end = token.end
else:
new_tokens.append(token)
last_end = new_tokens[-1].end if new_tokens else 0.0
if token.start - last_end >= MIN_SILENCE_DURATION: #if token is not silence but important gap
if new_tokens and new_tokens[-1].speaker == -2:
new_tokens[-1].end = token.start
else:
silence_token = ASRToken(
start=last_end,
end=token.start,
speaker=-2,
probability=0.95
)
new_tokens.append(silence_token)
if token.speaker != -2:
new_tokens.append(token)
return new_tokens
def ends_with_silence(tokens, beg_loop, vac_detected_silence):
current_time = time() - (beg_loop if beg_loop else 0.0)
last_token = tokens[-1]
if vac_detected_silence or (current_time - last_token.end >= END_SILENCE_DURATION):
if last_token.speaker == -2:
last_token.end = current_time
else:
tokens.append(
ASRToken(
start=tokens[-1].end,
end=current_time,
speaker=-2,
probability=0.95
)
)
return tokens
def handle_silences(tokens, beg_loop, vac_detected_silence):
if not tokens:
return []
tokens = blank_to_silence(tokens) #useful for simulstreaming backend which tends to generate [BLANK_AUDIO] text
tokens = no_token_to_silence(tokens)
tokens = ends_with_silence(tokens, beg_loop, vac_detected_silence)
return tokens

View File

@@ -1,154 +0,0 @@
import logging
from whisperlivekit.remove_silences import handle_silences
from whisperlivekit.timed_objects import Line, format_time
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
CHECK_AROUND = 4
DEBUG = False
def is_punctuation(token):
if token.is_punctuation():
return True
return False
def next_punctuation_change(i, tokens):
for ind in range(i+1, min(len(tokens), i+CHECK_AROUND+1)):
if is_punctuation(tokens[ind]):
return ind
return None
def next_speaker_change(i, tokens, speaker):
for ind in range(i-1, max(0, i-CHECK_AROUND)-1, -1):
token = tokens[ind]
if is_punctuation(token):
break
if token.speaker != speaker:
return ind, token.speaker
return None, speaker
def new_line(
token,
):
return Line(
speaker = token.corrected_speaker,
text = token.text + (f"[{format_time(token.start)} : {format_time(token.end)}]" if DEBUG else ""),
start = token.start,
end = token.end,
detected_language=token.detected_language
)
def append_token_to_last_line(lines, sep, token):
if not lines:
lines.append(new_line(token))
else:
if token.text:
lines[-1].text += sep + token.text + (f"[{format_time(token.start)} : {format_time(token.end)}]" if DEBUG else "")
lines[-1].end = token.end
if not lines[-1].detected_language and token.detected_language:
lines[-1].detected_language = token.detected_language
def format_output(state, silence, args, sep):
diarization = args.diarization
disable_punctuation_split = args.disable_punctuation_split
tokens = state.tokens
translation_validated_segments = state.translation_validated_segments # Here we will attribute the speakers only based on the timestamps of the segments
translation_buffer = state.translation_buffer
last_validated_token = state.last_validated_token
previous_speaker = 1
undiarized_text = []
tokens = handle_silences(tokens, state.beg_loop, silence)
last_punctuation = None
for i, token in enumerate(tokens[last_validated_token:]):
speaker = int(token.speaker)
token.corrected_speaker = speaker
if not diarization:
if speaker == -1: #Speaker -1 means no attributed by diarization. In the frontend, it should appear under 'Speaker 1'
token.corrected_speaker = 1
token.validated_speaker = True
else:
if is_punctuation(token):
last_punctuation = i
if last_punctuation == i-1:
if token.speaker != previous_speaker:
token.validated_speaker = True
# perfect, diarization perfectly aligned
last_punctuation = None
else:
speaker_change_pos, new_speaker = next_speaker_change(i, tokens, speaker)
if speaker_change_pos:
# Corrects delay:
# That was the idea. <Okay> haha |SPLIT SPEAKER| that's a good one
# should become:
# That was the idea. |SPLIT SPEAKER| <Okay> haha that's a good one
token.corrected_speaker = new_speaker
token.validated_speaker = True
elif speaker != previous_speaker:
if not (speaker == -2 or previous_speaker == -2):
if next_punctuation_change(i, tokens):
# Corrects advance:
# Are you |SPLIT SPEAKER| <okay>? yeah, sure. Absolutely
# should become:
# Are you <okay>? |SPLIT SPEAKER| yeah, sure. Absolutely
token.corrected_speaker = previous_speaker
token.validated_speaker = True
else: #Problematic, except if the language has no punctuation. We append to previous line, except if disable_punctuation_split is set to True.
if not disable_punctuation_split:
token.corrected_speaker = previous_speaker
token.validated_speaker = False
if token.validated_speaker:
state.last_validated_token = i
previous_speaker = token.corrected_speaker
previous_speaker = 1
lines = []
for token in tokens:
if int(token.corrected_speaker) != int(previous_speaker):
lines.append(new_line(token))
else:
append_token_to_last_line(lines, sep, token)
previous_speaker = token.corrected_speaker
if lines:
unassigned_translated_segments = []
for ts in translation_validated_segments:
assigned = False
for line in lines:
if ts and ts.overlaps_with(line):
if ts.is_within(line):
line.translation += ts.text + ' '
assigned = True
break
else:
ts0, ts1 = ts.approximate_cut_at(line.end)
if ts0 and line.overlaps_with(ts0):
line.translation += ts0.text + ' '
if ts1:
unassigned_translated_segments.append(ts1)
assigned = True
break
if not assigned:
unassigned_translated_segments.append(ts)
if unassigned_translated_segments:
for line in lines:
remaining_segments = []
for ts in unassigned_translated_segments:
if ts and ts.overlaps_with(line):
line.translation += ts.text + ' '
else:
remaining_segments.append(ts)
unassigned_translated_segments = remaining_segments #maybe do smth in the future about that
if state.buffer_transcription and lines:
lines[-1].end = max(state.buffer_transcription.end, lines[-1].end)
return lines, undiarized_text

View File

@@ -2,64 +2,39 @@ import sys
import numpy as np
import logging
from typing import List, Tuple, Optional
import logging
import platform
from whisperlivekit.timed_objects import ASRToken, Transcript, ChangeSpeaker
from whisperlivekit.warmup import load_file
from .whisper import load_model, tokenizer
from .whisper.audio import TOKENS_PER_SECOND
from whisperlivekit.whisper import load_model, tokenizer
from whisperlivekit.whisper.audio import TOKENS_PER_SECOND
import os
import gc
from pathlib import Path
logger = logging.getLogger(__name__)
from whisperlivekit.model_paths import model_path_and_type, resolve_model_path
from whisperlivekit.backend_support import (
mlx_backend_available,
faster_backend_available,
)
import torch
from whisperlivekit.simul_whisper.config import AlignAttConfig
from whisperlivekit.simul_whisper.simul_whisper import PaddedAlignAttWhisper
from whisperlivekit.simul_whisper.whisper import tokenizer
from whisperlivekit.simul_whisper.simul_whisper import AlignAtt
try:
from .mlx_encoder import mlx_model_mapping, load_mlx_encoder
HAS_MLX_WHISPER = True
except ImportError:
if platform.system() == "Darwin" and platform.machine() == "arm64":
print(f"""{"="*50}\nMLX Whisper not found but you are on Apple Silicon. Consider installing mlx-whisper for better performance: `pip install mlx-whisper`\n{"="*50}""")
HAS_MLX_WHISPER = False
logger = logging.getLogger(__name__)
HAS_MLX_WHISPER = mlx_backend_available(warn_on_missing=True)
if HAS_MLX_WHISPER:
HAS_FASTER_WHISPER = False
from .mlx_encoder import mlx_model_mapping, load_mlx_encoder
else:
try:
from faster_whisper import WhisperModel
HAS_FASTER_WHISPER = True
except ImportError:
if platform.system() != "Darwin":
print(f"""{"="*50}\nFaster-Whisper not found but. Consider installing faster-whisper for better performance: `pip install faster-whisper`\n{"="*50}`""")
HAS_FASTER_WHISPER = False
def model_path_and_type(model_path):
path = Path(model_path)
compatible_whisper_mlx = False
compatible_faster_whisper = False
pytorch_path = None
if path.is_file() and path.suffix.lower() in ['.pt', '.safetensors', '.bin']:
pytorch_path = path
elif path.is_dir():
for file in path.iterdir():
if file.is_file():
if file.name in ['weights.npz', "weights.safetensors"]:
compatible_whisper_mlx = True
elif file.suffix.lower() == '.bin':
compatible_faster_whisper = True
elif file.suffix.lower() == '.pt':
pytorch_path = file
elif file.suffix.lower() == '.safetensors':
pytorch_path = file
if pytorch_path is None:
if (model_path / Path("pytorch_model.bin")).exists():
pytorch_path = model_path / Path("pytorch_model.bin")
return pytorch_path, compatible_whisper_mlx, compatible_faster_whisper
mlx_model_mapping = {}
HAS_FASTER_WHISPER = faster_backend_available(warn_on_missing=not HAS_MLX_WHISPER)
if HAS_FASTER_WHISPER:
from faster_whisper import WhisperModel
else:
WhisperModel = None
MIN_DURATION_REAL_SILENCE = 5
class SimulStreamingOnlineProcessor:
SAMPLING_RATE = 16000
@@ -83,23 +58,29 @@ class SimulStreamingOnlineProcessor:
def load_new_backend(self):
model = self.asr.get_new_model_instance()
self.model = PaddedAlignAttWhisper(
self.model = AlignAtt(
cfg=self.asr.cfg,
loaded_model=model,
mlx_encoder=self.asr.mlx_encoder,
fw_encoder=self.asr.fw_encoder,
)
def insert_silence(self, silence_duration, offset):
def start_silence(self):
tokens, processed_upto = self.process_iter(is_last=True)
return tokens, processed_upto
def end_silence(self, silence_duration, offset):
"""
If silences are > 5s, we do a complete context clear. Otherwise, we just insert a small silence and shift the last_attend_frame
If silences are > MIN_DURATION_REAL_SILENCE, we do a complete context clear. Otherwise, we just insert a small silence and shift the last_attend_frame
"""
if silence_duration < 5:
gap_silence = torch.zeros(int(16000*silence_duration))
self.model.insert_audio(gap_silence)
# self.global_time_offset += silence_duration
else:
self.process_iter(is_last=True) #we want to totally process what remains in the buffer.
self.end += silence_duration
long_silence = silence_duration >= MIN_DURATION_REAL_SILENCE
if not long_silence:
gap_len = int(16000 * silence_duration)
if gap_len > 0:
gap_silence = torch.zeros(gap_len)
self.model.insert_audio(gap_silence)
if long_silence:
self.model.refresh_segment(complete=True)
self.model.global_time_offset = silence_duration + offset
@@ -177,11 +158,22 @@ class SimulStreamingASR():
self.decoder_type = 'greedy' if self.beams == 1 else 'beam'
self.fast_encoder = False
self._resolved_model_path = None
self.encoder_backend = "whisper"
preferred_backend = getattr(self, "backend", "auto")
self.pytorch_path, compatible_whisper_mlx, compatible_faster_whisper = None, True, True
if self.model_path:
self.pytorch_path, compatible_whisper_mlx, compatible_faster_whisper = model_path_and_type(self.model_path)
self.model_name = self.pytorch_path.stem
is_multilingual = not self.model_path.endswith(".en")
resolved_model_path = resolve_model_path(self.model_path)
self._resolved_model_path = resolved_model_path
self.model_path = str(resolved_model_path)
self.pytorch_path, compatible_whisper_mlx, compatible_faster_whisper = model_path_and_type(resolved_model_path)
if self.pytorch_path:
self.model_name = self.pytorch_path.stem
else:
self.model_name = Path(self.model_path).stem
raise FileNotFoundError(
f"No PyTorch checkpoint (.pt/.bin/.safetensors) found under {self.model_path}"
)
elif self.model_size is not None:
model_mapping = {
'tiny': './tiny.pt',
@@ -198,7 +190,19 @@ class SimulStreamingASR():
'large': './large-v3.pt'
}
self.model_name = self.model_size
is_multilingual = not self.model_name.endswith(".en")
else:
raise ValueError("Either model_size or model_path must be specified for SimulStreaming.")
is_multilingual = not self.model_name.endswith(".en")
self.encoder_backend = self._resolve_encoder_backend(
preferred_backend,
compatible_whisper_mlx,
compatible_faster_whisper,
)
self.fast_encoder = self.encoder_backend in ("mlx-whisper", "faster-whisper")
if self.encoder_backend == "whisper":
self.disable_fast_encoder = True
self.cfg = AlignAttConfig(
tokenizer_is_multilingual= is_multilingual,
@@ -210,7 +214,7 @@ class SimulStreamingASR():
cif_ckpt_path=self.cif_ckpt_path,
decoder_type="beam",
beam_size=self.beams,
task=self.task,
task=self.direct_english_translation,
never_fire=self.never_fire,
init_prompt=self.init_prompt,
max_context_tokens=self.max_context_tokens,
@@ -218,7 +222,7 @@ class SimulStreamingASR():
)
# Set up tokenizer for translation if needed
if self.task == "translate":
if self.direct_english_translation:
self.tokenizer = self.set_translate_task()
else:
self.tokenizer = None
@@ -227,32 +231,72 @@ class SimulStreamingASR():
self.mlx_encoder, self.fw_encoder = None, None
if not self.disable_fast_encoder:
if HAS_MLX_WHISPER:
print('Simulstreaming will use MLX whisper to increase encoding speed.')
if self.model_path and compatible_whisper_mlx:
mlx_model = self.model_path
else:
mlx_model = mlx_model_mapping.get(self.model_name)
if mlx_model:
self.mlx_encoder = load_mlx_encoder(path_or_hf_repo=mlx_model)
self.fast_encoder = True
elif HAS_FASTER_WHISPER and compatible_faster_whisper:
print('Simulstreaming will use Faster Whisper for the encoder.')
if self.model_path and compatible_faster_whisper:
fw_model = self.model_path
else:
fw_model = self.model_name
self.fw_encoder = WhisperModel(
fw_model,
device='auto',
compute_type='auto',
if self.encoder_backend == "mlx-whisper":
print('Simulstreaming will use MLX whisper to increase encoding speed.')
if self._resolved_model_path is not None:
mlx_model = str(self._resolved_model_path)
else:
mlx_model = mlx_model_mapping.get(self.model_name)
if not mlx_model:
raise FileNotFoundError(
f"MLX Whisper backend requested but no compatible weights found for model '{self.model_name}'."
)
self.fast_encoder = True
self.mlx_encoder = load_mlx_encoder(path_or_hf_repo=mlx_model)
elif self.encoder_backend == "faster-whisper":
print('Simulstreaming will use Faster Whisper for the encoder.')
if self._resolved_model_path is not None:
fw_model = str(self._resolved_model_path)
else:
fw_model = self.model_name
self.fw_encoder = WhisperModel(
fw_model,
device='auto',
compute_type='auto',
)
self.models = [self.load_model() for i in range(self.preload_model_count)]
def _resolve_encoder_backend(self, preferred_backend, compatible_whisper_mlx, compatible_faster_whisper):
choice = preferred_backend or "auto"
if self.disable_fast_encoder:
return "whisper"
if choice == "whisper":
return "whisper"
if choice == "mlx-whisper":
if not self._can_use_mlx(compatible_whisper_mlx):
raise RuntimeError("mlx-whisper backend requested but MLX Whisper is unavailable or incompatible with the provided model.")
return "mlx-whisper"
if choice == "faster-whisper":
if not self._can_use_faster(compatible_faster_whisper):
raise RuntimeError("faster-whisper backend requested but Faster-Whisper is unavailable or incompatible with the provided model.")
return "faster-whisper"
if choice == "openai-api":
raise ValueError("openai-api backend is only supported with the LocalAgreement policy.")
# auto mode
if platform.system() == "Darwin" and self._can_use_mlx(compatible_whisper_mlx):
return "mlx-whisper"
if self._can_use_faster(compatible_faster_whisper):
return "faster-whisper"
return "whisper"
def _has_custom_model_path(self):
return self._resolved_model_path is not None
def _can_use_mlx(self, compatible_whisper_mlx):
if not HAS_MLX_WHISPER:
return False
if self._has_custom_model_path():
return compatible_whisper_mlx
return self.model_name in mlx_model_mapping
def _can_use_faster(self, compatible_faster_whisper):
if not HAS_FASTER_WHISPER:
return False
if self._has_custom_model_path():
return compatible_faster_whisper
return True
def load_model(self):
whisper_model = load_model(
name=self.pytorch_path if self.pytorch_path else self.model_name,
@@ -264,7 +308,7 @@ class SimulStreamingASR():
if warmup_audio is not None:
warmup_audio = torch.from_numpy(warmup_audio).float()
if self.fast_encoder:
temp_model = PaddedAlignAttWhisper(
temp_model = AlignAtt(
cfg=self.cfg,
loaded_model=whisper_model,
mlx_encoder=self.mlx_encoder,

View File

@@ -1,4 +1,4 @@
from .whisper.decoding import PyTorchInference
from whisperlivekit.whisper.decoding import PyTorchInference
# extention of PyTorchInference for beam search
class BeamPyTorchInference(PyTorchInference):

View File

@@ -1,5 +1,3 @@
# This code was originally in simul_whisper/transcriber/simul_whisper.py . It is adapted a lot for SimulStreaming.
from dataclasses import dataclass, field
from typing import Literal

View File

@@ -1,43 +0,0 @@
class Tokens:
def __init__(self, tokens):
self.tokens = tokens
# def clone(self):
# return Tokens(self.tokens.clone())
def __str__(self):
return str(self.tokens.tolist())
def __repr__(self):
return self.__str__()
class BeamTokens(Tokens):
def __init__(self, tokens, beam_size):
self.tokens = tokens
self.beam_size = beam_size
def clone(self):
return BeamTokens(self.tokens.clone())
def __str__(self):
return f"BeamTokens({self.tokens.tolist()}, beam_size={self.beam_size})"
def __repr__(self):
return self.__str__()
def as_text(self, tokenizer):
return tokenizer.decode(self.tokens)
class Logits(Tokens):
def __init__(self, logits):
super().__init__(logits)
# def clone(self):
# return Logits(self.tokens.clone(), self.beam_size)
def __str__(self):
# return "abc"
return f"Logits({self.tokens.shape})"
def __repr__(self):
return self.__str__()

View File

@@ -1,48 +1,57 @@
# This code was originally in simul_whisper/transcriber/simul_whisper.py . It is adapted a lot for SimulStreaming.
import os
import logging
import torch
import torch.nn.functional as F
import numpy as np
from .whisper import load_model, DecodingOptions, tokenizer
from whisperlivekit.whisper import DecodingOptions, tokenizer
from .config import AlignAttConfig
from whisperlivekit.timed_objects import ASRToken
from .whisper.audio import log_mel_spectrogram, TOKENS_PER_SECOND, pad_or_trim, N_SAMPLES, N_FRAMES
from .whisper.timing import median_filter
from .whisper.decoding import GreedyDecoder, BeamSearchDecoder, SuppressTokens, detect_language
from whisperlivekit.whisper.audio import log_mel_spectrogram, TOKENS_PER_SECOND, pad_or_trim, N_SAMPLES, N_FRAMES
from whisperlivekit.whisper.timing import median_filter
from whisperlivekit.whisper.decoding import GreedyDecoder, BeamSearchDecoder, SuppressTokens
from .beam import BeamPyTorchInference
from .eow_detection import fire_at_boundary, load_cif
import os
from time import time
from .token_buffer import TokenBuffer
from whisperlivekit.backend_support import (
mlx_backend_available,
faster_backend_available,
)
import numpy as np
from ..timed_objects import PUNCTUATION_MARKS
from .generation_progress import *
DEC_PAD = 50257
logger = logging.getLogger(__name__)
try:
if mlx_backend_available():
from mlx_whisper.audio import log_mel_spectrogram as mlx_log_mel_spectrogram
from mlx_whisper.transcribe import pad_or_trim as mlx_pad_or_trim
HAS_MLX_WHISPER = True
except ImportError:
HAS_MLX_WHISPER = False
if HAS_MLX_WHISPER:
HAS_FASTER_WHISPER = False
else:
try:
from faster_whisper.audio import pad_or_trim as fw_pad_or_trim
from faster_whisper.feature_extractor import FeatureExtractor
HAS_FASTER_WHISPER = True
except ImportError:
HAS_FASTER_WHISPER = False
class PaddedAlignAttWhisper:
if faster_backend_available():
from faster_whisper.audio import pad_or_trim as fw_pad_or_trim
from faster_whisper.feature_extractor import FeatureExtractor
USE_MLCORE = False
def load_coreml_encoder():
try:
from coremltools.models import MLModel
except ImportError:
logger.warning("coremltools is not installed")
return None
COREML_ENCODER_PATH = os.environ.get("MLCORE_ENCODER_PATH", "whisperlivekit/whisper/whisper_encoder.mlpackage")
_coreml_encoder = MLModel(COREML_ENCODER_PATH)
spec = _coreml_encoder.get_spec()
_coreml_input_name = spec.description.input[0].name if spec.description.input else "mel"
_coreml_output_name = spec.description.output[0].name if spec.description.output else None
return _coreml_encoder, _coreml_input_name, _coreml_output_name
class AlignAtt:
def __init__(
self,
cfg: AlignAttConfig,
@@ -54,9 +63,13 @@ class PaddedAlignAttWhisper:
self.model = loaded_model
self.mlx_encoder = mlx_encoder
self.fw_encoder = fw_encoder
self.fw_encoder = fw_encoder
if fw_encoder:
self.fw_feature_extractor = FeatureExtractor(feature_size=self.model.dims.n_mels)
self.coreml_encoder_tuple = None
if USE_MLCORE:
self.coreml_encoder_tuple = load_coreml_encoder()
self.use_mlcore = self.coreml_encoder_tuple is not None
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
@@ -253,7 +266,7 @@ class PaddedAlignAttWhisper:
logger.debug("Refreshing segment:")
self.init_tokens()
self.last_attend_frame = -self.cfg.rewind_threshold
self.detected_language = None
# self.detected_language = None
self.cumulative_time_offset = 0.0
self.init_context()
logger.debug(f"Context: {self.context}")
@@ -392,15 +405,28 @@ class PaddedAlignAttWhisper:
else:
input_segments = self.segments[0]
# if self.cfg.language == "auto" and self.reset_tokenizer_to_auto_next_call:
# logger.debug("Resetting tokenizer to auto for new sentence.")
# self.create_tokenizer(None)
# self.detected_language = None
# self.init_tokens()
# self.reset_tokenizer_to_auto_next_call = False
# NEW : we can use a different encoder, before using standart whisper for cross attention with the hooks on the decoder
beg_encode = time()
if self.use_mlcore:
coreml_encoder, coreml_input_name, coreml_output_name = self.coreml_encoder_tuple
mel_padded = log_mel_spectrogram(
input_segments,
n_mels=self.model.dims.n_mels,
padding=N_SAMPLES,
device="cpu",
).unsqueeze(0)
mel = pad_or_trim(mel_padded, N_FRAMES)
content_mel_len = int((mel_padded.shape[2] - mel.shape[2]) / 2)
mel_np = np.ascontiguousarray(mel.numpy())
ml_inputs = {coreml_input_name or "mel": mel_np}
coreml_outputs = coreml_encoder.predict(ml_inputs)
if coreml_output_name and coreml_output_name in coreml_outputs:
encoder_feature_np = coreml_outputs[coreml_output_name]
else:
encoder_feature_np = next(iter(coreml_outputs.values()))
encoder_feature = torch.as_tensor(
np.array(encoder_feature_np),
device=self.device,
)
if self.mlx_encoder:
mlx_mel_padded = mlx_log_mel_spectrogram(audio=input_segments.detach(), n_mels=self.model.dims.n_mels, padding=N_SAMPLES)
mlx_mel = mlx_pad_or_trim(mlx_mel_padded, N_FRAMES, axis=-2)
@@ -615,10 +641,9 @@ class PaddedAlignAttWhisper:
timestamp_idx += len(word_tokens)
timestamp_entry = ASRToken(
start=current_timestamp,
end=current_timestamp + 0.1,
start=round(current_timestamp, 2),
end=round(current_timestamp + 0.1, 2),
text= word,
probability=0.95,
speaker=self.speaker,
detected_language=self.detected_language
).with_offset(

View File

@@ -1,5 +1,5 @@
from dataclasses import dataclass, field
from typing import Optional, Any, List
from typing import Optional, List, Union, Dict, Any
from datetime import timedelta
PUNCTUATION_MARKS = {'.', '!', '?', '', '', ''}
@@ -8,22 +8,19 @@ def format_time(seconds: float) -> str:
"""Format seconds as HH:MM:SS."""
return str(timedelta(seconds=int(seconds)))
@dataclass
class TimedText:
class Timed:
start: Optional[float] = 0
end: Optional[float] = 0
@dataclass
class TimedText(Timed):
text: Optional[str] = ''
speaker: Optional[int] = -1
probability: Optional[float] = None
is_dummy: Optional[bool] = False
detected_language: Optional[str] = None
def is_punctuation(self):
return self.text.strip() in PUNCTUATION_MARKS
def overlaps_with(self, other: 'TimedText') -> bool:
return not (self.end <= other.start or other.end <= self.start)
def has_punctuation(self) -> bool:
return any(char in PUNCTUATION_MARKS for char in self.text.strip())
def is_within(self, other: 'TimedText') -> bool:
return other.contains_timespan(self)
@@ -31,27 +28,25 @@ class TimedText:
def duration(self) -> float:
return self.end - self.start
def contains_time(self, time: float) -> bool:
return self.start <= time <= self.end
def contains_timespan(self, other: 'TimedText') -> bool:
return self.start <= other.start and self.end >= other.end
def __bool__(self):
def __bool__(self) -> bool:
return bool(self.text)
def __str__(self) -> str:
return str(self.text)
@dataclass()
class ASRToken(TimedText):
corrected_speaker: Optional[int] = -1
validated_speaker: bool = False
validated_text: bool = False
validated_language: bool = False
def with_offset(self, offset: float) -> "ASRToken":
"""Return a new token with the time offset added."""
return ASRToken(self.start + offset, self.end + offset, self.text, self.speaker, self.probability, detected_language=self.detected_language)
return ASRToken(self.start + offset, self.end + offset, self.text, self.speaker, detected_language=self.detected_language)
def is_silence(self) -> bool:
return False
@dataclass
class Sentence(TimedText):
@@ -70,68 +65,94 @@ class Transcript(TimedText):
sep: Optional[str] = None,
offset: float = 0
) -> "Transcript":
"""Collapse multiple ASR tokens into a single transcript span."""
sep = sep if sep is not None else ' '
text = sep.join(token.text for token in tokens)
probability = sum(token.probability for token in tokens if token.probability) / len(tokens) if tokens else None
if tokens:
start = offset + tokens[0].start
end = offset + tokens[-1].end
else:
start = None
end = None
return cls(start, end, text, probability=probability)
return cls(start, end, text)
@dataclass
class SpeakerSegment(TimedText):
class SpeakerSegment(Timed):
"""Represents a segment of audio attributed to a specific speaker.
No text nor probability is associated with this segment.
"""
speaker: Optional[int] = -1
pass
@dataclass
class Translation(TimedText):
pass
def approximate_cut_at(self, cut_time):
"""
Each word in text is considered to be of duration (end-start)/len(words in text)
"""
if not self.text or not self.contains_time(cut_time):
return self, None
words = self.text.split()
num_words = len(words)
if num_words == 0:
return self, None
duration_per_word = self.duration() / num_words
cut_word_index = int((cut_time - self.start) / duration_per_word)
if cut_word_index >= num_words:
cut_word_index = num_words -1
text0 = " ".join(words[:cut_word_index])
text1 = " ".join(words[cut_word_index:])
segment0 = Translation(start=self.start, end=cut_time, text=text0)
segment1 = Translation(start=cut_time, end=self.end, text=text1)
return segment0, segment1
@dataclass
class Silence():
duration: float
start: Optional[float] = None
end: Optional[float] = None
duration: Optional[float] = None
is_starting: bool = False
has_ended: bool = False
def compute_duration(self) -> Optional[float]:
if self.start is None or self.end is None:
return None
self.duration = self.end - self.start
return self.duration
def is_silence(self) -> bool:
return True
@dataclass
class Segment(TimedText):
"""Generic contiguous span built from tokens or silence markers."""
start: Optional[float]
end: Optional[float]
text: Optional[str]
speaker: Optional[str]
@classmethod
def from_tokens(
cls,
tokens: List[Union[ASRToken, Silence]],
is_silence: bool = False
) -> Optional["Segment"]:
"""Return a normalized segment representing the provided tokens."""
if not tokens:
return None
start_token = tokens[0]
end_token = tokens[-1]
if is_silence:
return cls(
start=start_token.start,
end=end_token.end,
text=None,
speaker=-2
)
else:
return cls(
start=start_token.start,
end=end_token.end,
text=''.join(token.text for token in tokens),
speaker=-1,
detected_language=start_token.detected_language
)
def is_silence(self) -> bool:
"""True when this segment represents a silence gap."""
return self.speaker == -2
@dataclass
class Line(TimedText):
translation: str = ''
def to_dict(self):
_dict = {
def to_dict(self) -> Dict[str, Any]:
"""Serialize the line for frontend consumption."""
_dict: Dict[str, Any] = {
'speaker': int(self.speaker) if self.speaker != -1 else 1,
'text': self.text,
'start': format_time(self.start),
@@ -143,6 +164,33 @@ class Line(TimedText):
_dict['detected_language'] = self.detected_language
return _dict
def build_from_tokens(self, tokens: List[ASRToken]) -> "Line":
"""Populate line attributes from a contiguous token list."""
self.text = ''.join([token.text for token in tokens])
self.start = tokens[0].start
self.end = tokens[-1].end
self.speaker = 1
self.detected_language = tokens[0].detected_language
return self
def build_from_segment(self, segment: Segment) -> "Line":
"""Populate the line fields from a pre-built segment."""
self.text = segment.text
self.start = segment.start
self.end = segment.end
self.speaker = segment.speaker
self.detected_language = segment.detected_language
return self
def is_silent(self) -> bool:
return self.speaker == -2
class SilentLine(Line):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.speaker = -2
self.text = ''
@dataclass
class FrontData():
@@ -151,15 +199,18 @@ class FrontData():
lines: list[Line] = field(default_factory=list)
buffer_transcription: str = ''
buffer_diarization: str = ''
buffer_translation: str = ''
remaining_time_transcription: float = 0.
remaining_time_diarization: float = 0.
def to_dict(self):
_dict = {
def to_dict(self) -> Dict[str, Any]:
"""Serialize the front-end data payload."""
_dict: Dict[str, Any] = {
'status': self.status,
'lines': [line.to_dict() for line in self.lines if (line.text or line.speaker == -2)],
'buffer_transcription': self.buffer_transcription,
'buffer_diarization': self.buffer_diarization,
'buffer_translation': self.buffer_translation,
'remaining_time_transcription': self.remaining_time_transcription,
'remaining_time_diarization': self.remaining_time_diarization,
}
@@ -174,13 +225,22 @@ class ChangeSpeaker:
@dataclass
class State():
tokens: list = field(default_factory=list)
last_validated_token: int = 0
translation_validated_segments: list = field(default_factory=list)
translation_buffer: list = field(default_factory=list)
buffer_transcription: str = field(default_factory=Transcript)
"""Unified state class for audio processing.
Contains both persistent state (tokens, buffers) and temporary update buffers
(new_* fields) that are consumed by TokensAlignment.
"""
# Persistent state
tokens: List[ASRToken] = field(default_factory=list)
buffer_transcription: Transcript = field(default_factory=Transcript)
end_buffer: float = 0.0
end_attributed_speaker: float = 0.0
remaining_time_transcription: float = 0.0
remaining_time_diarization: float = 0.0
beg_loop: Optional[int] = None
# Temporary update buffers (consumed by TokensAlignment.update())
new_tokens: List[Union[ASRToken, Silence]] = field(default_factory=list)
new_translation: List[Any] = field(default_factory=list)
new_diarization: List[Any] = field(default_factory=list)
new_tokens_buffer: List[Any] = field(default_factory=list) # only when local agreement
new_translation_buffer= TimedText()

View File

@@ -0,0 +1,177 @@
from time import time
from typing import Optional, List, Tuple, Union, Any
from whisperlivekit.timed_objects import Line, SilentLine, ASRToken, SpeakerSegment, Silence, TimedText, Segment
class TokensAlignment:
def __init__(self, state: Any, args: Any, sep: Optional[str]) -> None:
self.state = state
self.diarization = args.diarization
self._tokens_index: int = 0
self._diarization_index: int = 0
self._translation_index: int = 0
self.all_tokens: List[ASRToken] = []
self.all_diarization_segments: List[SpeakerSegment] = []
self.all_translation_segments: List[Any] = []
self.new_tokens: List[ASRToken] = []
self.new_diarization: List[SpeakerSegment] = []
self.new_translation: List[Any] = []
self.new_translation_buffer: Union[TimedText, str] = TimedText()
self.new_tokens_buffer: List[Any] = []
self.sep: str = sep if sep is not None else ' '
self.beg_loop: Optional[float] = None
def update(self) -> None:
"""Drain state buffers into the running alignment context."""
self.new_tokens, self.state.new_tokens = self.state.new_tokens, []
self.new_diarization, self.state.new_diarization = self.state.new_diarization, []
self.new_translation, self.state.new_translation = self.state.new_translation, []
self.new_tokens_buffer, self.state.new_tokens_buffer = self.state.new_tokens_buffer, []
self.all_tokens.extend(self.new_tokens)
self.all_diarization_segments.extend(self.new_diarization)
self.all_translation_segments.extend(self.new_translation)
self.new_translation_buffer = self.state.new_translation_buffer
def add_translation(self, line: Line) -> None:
"""Append translated text segments that overlap with a line."""
for ts in self.all_translation_segments:
if ts.is_within(line):
line.translation += ts.text + (self.sep if ts.text else '')
elif line.translation:
break
def compute_punctuations_segments(self, tokens: Optional[List[ASRToken]] = None) -> List[Segment]:
"""Group tokens into segments split by punctuation and explicit silence."""
segments = []
segment_start_idx = 0
for i, token in enumerate(self.all_tokens):
if token.is_silence():
previous_segment = Segment.from_tokens(
tokens=self.all_tokens[segment_start_idx: i],
)
if previous_segment:
segments.append(previous_segment)
segment = Segment.from_tokens(
tokens=[token],
is_silence=True
)
segments.append(segment)
segment_start_idx = i+1
else:
if token.has_punctuation():
segment = Segment.from_tokens(
tokens=self.all_tokens[segment_start_idx: i+1],
)
segments.append(segment)
segment_start_idx = i+1
final_segment = Segment.from_tokens(
tokens=self.all_tokens[segment_start_idx:],
)
if final_segment:
segments.append(final_segment)
return segments
def concatenate_diar_segments(self) -> List[SpeakerSegment]:
"""Merge consecutive diarization slices that share the same speaker."""
if not self.all_diarization_segments:
return []
merged = [self.all_diarization_segments[0]]
for segment in self.all_diarization_segments[1:]:
if segment.speaker == merged[-1].speaker:
merged[-1].end = segment.end
else:
merged.append(segment)
return merged
@staticmethod
def intersection_duration(seg1: TimedText, seg2: TimedText) -> float:
"""Return the overlap duration between two timed segments."""
start = max(seg1.start, seg2.start)
end = min(seg1.end, seg2.end)
return max(0, end - start)
def get_lines_diarization(self) -> Tuple[List[Line], str]:
"""Build lines when diarization is enabled and track overflow buffer."""
diarization_buffer = ''
punctuation_segments = self.compute_punctuations_segments()
diarization_segments = self.concatenate_diar_segments()
for punctuation_segment in punctuation_segments:
if not punctuation_segment.is_silence():
if diarization_segments and punctuation_segment.start >= diarization_segments[-1].end:
diarization_buffer += punctuation_segment.text
else:
max_overlap = 0.0
max_overlap_speaker = 1
for diarization_segment in diarization_segments:
intersec = self.intersection_duration(punctuation_segment, diarization_segment)
if intersec > max_overlap:
max_overlap = intersec
max_overlap_speaker = diarization_segment.speaker + 1
punctuation_segment.speaker = max_overlap_speaker
lines = []
if punctuation_segments:
lines = [Line().build_from_segment(punctuation_segments[0])]
for segment in punctuation_segments[1:]:
if segment.speaker == lines[-1].speaker:
if lines[-1].text:
lines[-1].text += segment.text
lines[-1].end = segment.end
else:
lines.append(Line().build_from_segment(segment))
return lines, diarization_buffer
def get_lines(
self,
diarization: bool = False,
translation: bool = False,
current_silence: Optional[Silence] = None
) -> Tuple[List[Line], str, Union[str, TimedText]]:
"""Return the formatted lines plus buffers, optionally with diarization/translation."""
if diarization:
lines, diarization_buffer = self.get_lines_diarization()
else:
diarization_buffer = ''
lines = []
current_line_tokens = []
for token in self.all_tokens:
if token.is_silence():
if current_line_tokens:
lines.append(Line().build_from_tokens(current_line_tokens))
current_line_tokens = []
end_silence = token.end if token.has_ended else time() - self.beg_loop
if lines and lines[-1].is_silent():
lines[-1].end = end_silence
else:
lines.append(SilentLine(
start = token.start,
end = end_silence
))
else:
current_line_tokens.append(token)
if current_line_tokens:
lines.append(Line().build_from_tokens(current_line_tokens))
if current_silence:
end_silence = current_silence.end if current_silence.has_ended else time() - self.beg_loop
if lines and lines[-1].is_silent():
lines[-1].end = end_silence
else:
lines.append(SilentLine(
start = current_silence.start,
end = end_silence
))
if translation:
[self.add_translation(line) for line in lines if not type(line) == Silence]
return lines, diarization_buffer, self.new_translation_buffer.text

View File

@@ -1,60 +0,0 @@
from typing import Sequence, Callable, Any, Optional, Dict
def _detect_tail_repetition(
seq: Sequence[Any],
key: Callable[[Any], Any] = lambda x: x, # extract comparable value
min_block: int = 1, # set to 2 to ignore 1-token loops like "."
max_tail: int = 300, # search window from the end for speed
prefer: str = "longest", # "longest" coverage or "smallest" block
) -> Optional[Dict]:
vals = [key(x) for x in seq][-max_tail:]
n = len(vals)
best = None
# try every possible block length
for b in range(min_block, n // 2 + 1):
block = vals[-b:]
# count how many times this block repeats contiguously at the very end
count, i = 0, n
while i - b >= 0 and vals[i - b:i] == block:
count += 1
i -= b
if count >= 2:
cand = {
"block_size": b,
"count": count,
"start_index": len(seq) - count * b, # in original seq
"end_index": len(seq),
}
if (best is None or
(prefer == "longest" and count * b > best["count"] * best["block_size"]) or
(prefer == "smallest" and b < best["block_size"])):
best = cand
return best
def trim_tail_repetition(
seq: Sequence[Any],
key: Callable[[Any], Any] = lambda x: x,
min_block: int = 1,
max_tail: int = 300,
prefer: str = "longest",
keep: int = 1, # how many copies of the repeating block to keep at the end (0 or 1 are common)
):
"""
Returns a new sequence with repeated tail trimmed.
keep=1 -> keep a single copy of the repeated block.
keep=0 -> remove all copies of the repeated block.
"""
rep = _detect_tail_repetition(seq, key, min_block, max_tail, prefer)
if not rep:
return seq, False # nothing to trim
b, c = rep["block_size"], rep["count"]
if keep < 0:
keep = 0
if keep >= c:
return seq, False # nothing to trim (already <= keep copies)
# new length = total - (copies_to_remove * block_size)
new_len = len(seq) - (c - keep) * b
return seq[:new_len], True

View File

@@ -490,6 +490,11 @@ label {
margin-left: 4px;
}
.buffer_translation {
color: #a0a0a0;
margin-left: 6px;
}
.spinner {
display: inline-block;
width: 8px;

View File

@@ -232,10 +232,11 @@ function setupWebSocket() {
if (waitingForStop) {
statusText.textContent = "Processing finalized or connection closed.";
if (lastReceivedData) {
renderLinesWithBuffer(
renderLinesWithBuffer(
lastReceivedData.lines || [],
lastReceivedData.buffer_diarization || "",
lastReceivedData.buffer_transcription || "",
lastReceivedData.buffer_translation || "",
0,
0,
true
@@ -281,6 +282,7 @@ function setupWebSocket() {
lastReceivedData.lines || [],
lastReceivedData.buffer_diarization || "",
lastReceivedData.buffer_transcription || "",
lastReceivedData.buffer_translation || "",
0,
0,
true
@@ -301,6 +303,7 @@ function setupWebSocket() {
lines = [],
buffer_transcription = "",
buffer_diarization = "",
buffer_translation = "",
remaining_time_transcription = 0,
remaining_time_diarization = 0,
status = "active_transcription",
@@ -310,6 +313,7 @@ function setupWebSocket() {
lines,
buffer_diarization,
buffer_transcription,
buffer_translation,
remaining_time_diarization,
remaining_time_transcription,
false,
@@ -323,6 +327,7 @@ function renderLinesWithBuffer(
lines,
buffer_diarization,
buffer_transcription,
buffer_translation,
remaining_time_diarization,
remaining_time_transcription,
isFinalizing = false,
@@ -341,6 +346,7 @@ function renderLinesWithBuffer(
lines: (lines || []).map((it) => ({ speaker: it.speaker, text: it.text, start: it.start, end: it.end, detected_language: it.detected_language })),
buffer_transcription: buffer_transcription || "",
buffer_diarization: buffer_diarization || "",
buffer_translation: buffer_translation,
status: current_status,
showLoading,
showTransLag,
@@ -385,12 +391,11 @@ function renderLinesWithBuffer(
if (idx === lines.length - 1) {
if (!isFinalizing && item.speaker !== -2) {
if (remaining_time_transcription > 0) {
speakerLabel += `<span class="label_transcription"><span class="spinner"></span>Transcription lag <span id='timeInfo'><span class="lag-transcription-value">${fmt1(
remaining_time_transcription
)}</span>s</span></span>`;
}
if (buffer_diarization && remaining_time_diarization > 0) {
if (buffer_diarization && remaining_time_diarization) {
speakerLabel += `<span class="label_diarization"><span class="spinner"></span>Diarization lag<span id='timeInfo'><span class="lag-diarization-value">${fmt1(
remaining_time_diarization
)}</span>s</span></span>`;
@@ -415,13 +420,22 @@ function renderLinesWithBuffer(
}
}
}
let translationContent = "";
if (item.translation) {
translationContent += item.translation.trim();
}
if (idx === lines.length - 1 && buffer_translation) {
const bufferPiece = isFinalizing
? buffer_translation
: `<span class="buffer_translation">${buffer_translation}</span>`;
translationContent += translationContent ? `${bufferPiece}` : bufferPiece;
}
if (translationContent.trim().length > 0) {
currentLineText += `
<div>
<div class="label_translation">
${translationIcon}
<span>${item.translation}</span>
<span class="translation_text">${translationContent}</span>
</div>
</div>`;
}

View File

@@ -4,17 +4,18 @@ import json
import os
import urllib
import warnings
from typing import List, Optional, Union, Dict
from typing import Dict, List, Optional, Union
import torch
from tqdm import tqdm
from pathlib import Path
from torch import Tensor
from .audio import load_audio, log_mel_spectrogram, pad_or_trim
from .decoding import DecodingOptions, DecodingResult, decode, detect_language
from .model import ModelDimensions, Whisper
from .transcribe import transcribe
from .version import __version__
from whisperlivekit.whisper.audio import load_audio, log_mel_spectrogram, pad_or_trim
from whisperlivekit.whisper.decoding import DecodingOptions, DecodingResult, decode, detect_language
from whisperlivekit.whisper.model import ModelDimensions, Whisper
from whisperlivekit.whisper.transcribe import transcribe
from whisperlivekit.whisper.version import __version__
_MODELS = {
"tiny.en": "https://openaipublic.azureedge.net/main/whisper/models/d3dd57d32accea0b295c96e26691aa14d8822fac7d9d27d5dc00b4ca2826dd03/tiny.en.pt",
@@ -233,13 +234,97 @@ def _convert_hf_state_dict(state_dict: Dict[str, torch.Tensor]) -> Dict[str, tor
return converted if converted else state_dict
def _load_lora_state(lora_path: str):
safe_path = os.path.join(lora_path, "adapter_model.safetensors")
bin_path = os.path.join(lora_path, "adapter_model.bin")
if os.path.isfile(safe_path):
try:
from safetensors.torch import load_file
except ImportError as exc:
raise ImportError(
"Loading LoRA adapters stored as .safetensors requires the `safetensors` package."
) from exc
return load_file(safe_path)
if os.path.isfile(bin_path):
return torch.load(bin_path, map_location="cpu")
raise FileNotFoundError(
f"No adapter weights found under {lora_path}. Expected adapter_model.safetensors or adapter_model.bin."
)
def _collapse_hf_module_name(module: str):
if module.startswith("base_model."):
module = module[len("base_model.") :]
if module.startswith("model.model."):
module = module[len("model.") :]
if not module.startswith("model."):
module = f"model.{module}"
return module
def _apply_lora_adapter(state_dict: Dict[str, Tensor], lora_path: Optional[str]):
if not lora_path:
return
config_path = os.path.join(lora_path, "adapter_config.json")
if not os.path.isfile(config_path):
raise FileNotFoundError(f"Missing adapter_config.json inside {lora_path}")
with open(config_path, "r", encoding="utf-8") as handle:
config = json.load(handle)
if config.get("peft_type") != "LORA":
raise ValueError("Only LoRA adapters are supported.")
r = config.get("r")
alpha = config.get("lora_alpha") or config.get("alpha")
if not r or not alpha:
raise ValueError("LoRA config must include `r` and `lora_alpha`.")
scaling = alpha / r
adapter_state = _load_lora_state(lora_path)
lora_layers: Dict[str, Dict[str, Tensor]] = {}
for key, tensor in adapter_state.items():
if key.endswith("lora_A.weight"):
module = key[: -len(".lora_A.weight")]
lora_layers.setdefault(module, {})["A"] = tensor
elif key.endswith("lora_B.weight"):
module = key[: -len(".lora_B.weight")]
lora_layers.setdefault(module, {})["B"] = tensor
if not lora_layers:
raise ValueError(f"No LoRA tensors found in {lora_path}")
for module, parts in lora_layers.items():
if "A" not in parts or "B" not in parts:
raise ValueError(f"Incomplete LoRA tensors for module '{module}'")
hf_module = _collapse_hf_module_name(module)
hf_weight_key = f"{hf_module}.weight"
delta = parts["B"] @ parts["A"]
delta = delta * scaling
converted = _convert_hf_state_dict({hf_weight_key: delta})
if not converted:
raise KeyError(f"Failed to map LoRA module '{module}' into Whisper state dict.")
target_name, delta_tensor = next(iter(converted.items()))
if target_name not in state_dict:
raise KeyError(
f"LoRA module '{module}' mapped to '{target_name}', but the base model has no such parameter."
)
state_dict[target_name] = state_dict[target_name] + delta_tensor.to(
dtype=state_dict[target_name].dtype, device=state_dict[target_name].device
)
def load_model(
name: str,
device: Optional[Union[str, torch.device]] = None,
download_root: str = None,
in_memory: bool = False,
decoder_only=False,
custom_alignment_heads=None
decoder_only: bool = False,
custom_alignment_heads: Optional[str] = None,
lora_path: Optional[str] = None,
) -> Whisper:
"""
Load a Whisper ASR model
@@ -255,6 +340,8 @@ def load_model(
path to download the model files; by default, it uses "~/.cache/whisper"
in_memory: bool
whether to preload the model weights into host memory
lora_path: str
optional directory containing PEFT LoRA adapter weights (adapter_config + adapter_model)
Returns
-------
@@ -302,6 +389,7 @@ def load_model(
else:
state_dict = checkpoint
state_dict = _convert_hf_state_dict(state_dict)
_apply_lora_adapter(state_dict, lora_path)
if dims_cfg is not None:
dims = ModelDimensions(**dims_cfg)
@@ -329,3 +417,47 @@ def load_model(
model.set_alignment_heads(alignment_heads)
return model.to(device)
def convert_encoder_to_coreml(
model_name = "base",
output_path= "whisper_encoder.mlpackage",
dummy_frames = 3000, #Number of time frames to use for the dummy mel input during tracing
precision = "float16",
):
import coremltools as ct
model = load_model(model_name, device="cpu", decoder_only=False)
encoder = model.encoder.eval().cpu()
dummy_input = torch.randn(
1,
model.dims.n_mels,
dummy_frames,
dtype=next(encoder.parameters()).dtype,
)
with torch.no_grad():
traced_encoder = torch.jit.trace(encoder, dummy_input)
precision_map = {
"float16": ct.precision.FLOAT16,
"fp16": ct.precision.FLOAT16,
"float32": ct.precision.FLOAT32,
"fp32": ct.precision.FLOAT32,
}
coreml_precision = precision_map[precision.lower()]
mlmodel = ct.convert(
traced_encoder,
inputs=[ct.TensorType(name="mel", shape=dummy_input.shape)],
convert_to= "mlprogram",
compute_precision=coreml_precision,
)
output_path = Path(output_path)
mlmodel.save(str(output_path))
return output_path
# if __name__ == "__main__":
# convert_encoder_to_coreml(model_name="tiny", output_path="whisper_encoder.mlpackage", dummy_frames=3000, precision="float16", convert_to="mlprogram")

View File

@@ -1,122 +0,0 @@
#!/usr/bin/env python3
import sys
import numpy as np
import librosa
from functools import lru_cache
import time
import logging
from .backends import FasterWhisperASR, MLXWhisper, WhisperTimestampedASR, OpenaiApiASR
from whisperlivekit.warmup import warmup_asr
logger = logging.getLogger(__name__)
WHISPER_LANG_CODES = "af,am,ar,as,az,ba,be,bg,bn,bo,br,bs,ca,cs,cy,da,de,el,en,es,et,eu,fa,fi,fo,fr,gl,gu,ha,haw,he,hi,hr,ht,hu,hy,id,is,it,ja,jw,ka,kk,km,kn,ko,la,lb,ln,lo,lt,lv,mg,mi,mk,ml,mn,mr,ms,mt,my,ne,nl,nn,no,oc,pa,pl,ps,pt,ro,ru,sa,sd,si,sk,sl,sn,so,sq,sr,su,sv,sw,ta,te,tg,th,tk,tl,tr,tt,uk,ur,uz,vi,yi,yo,zh".split(
","
)
def create_tokenizer(lan):
"""returns an object that has split function that works like the one of MosesTokenizer"""
assert (
lan in WHISPER_LANG_CODES
), "language must be Whisper's supported lang code: " + " ".join(WHISPER_LANG_CODES)
if lan == "uk":
import tokenize_uk
class UkrainianTokenizer:
def split(self, text):
return tokenize_uk.tokenize_sents(text)
return UkrainianTokenizer()
# supported by fast-mosestokenizer
if (
lan
in "as bn ca cs de el en es et fi fr ga gu hi hu is it kn lt lv ml mni mr nl or pa pl pt ro ru sk sl sv ta te yue zh".split()
):
from mosestokenizer import MosesSentenceSplitter
return MosesSentenceSplitter(lan)
# the following languages are in Whisper, but not in wtpsplit:
if (
lan
in "as ba bo br bs fo haw hr ht jw lb ln lo mi nn oc sa sd sn so su sw tk tl tt".split()
):
logger.debug(
f"{lan} code is not supported by wtpsplit. Going to use None lang_code option."
)
lan = None
from wtpsplit import WtP
# downloads the model from huggingface on the first use
wtp = WtP("wtp-canine-s-12l-no-adapters")
class WtPtok:
def split(self, sent):
return wtp.split(sent, lang_code=lan)
return WtPtok()
def backend_factory(
backend,
lan,
model_size,
model_cache_dir,
model_dir,
task,
buffer_trimming,
buffer_trimming_sec,
confidence_validation,
warmup_file=None,
min_chunk_size=None,
):
backend = backend
if backend == "openai-api":
logger.debug("Using OpenAI API.")
asr = OpenaiApiASR(lan=lan)
else:
if backend == "faster-whisper":
asr_cls = FasterWhisperASR
elif backend == "mlx-whisper":
asr_cls = MLXWhisper
else:
asr_cls = WhisperTimestampedASR
# Only for FasterWhisperASR and WhisperTimestampedASR
t = time.time()
logger.info(f"Loading Whisper {model_size} model for language {lan}...")
asr = asr_cls(
model_size=model_size,
lan=lan,
cache_dir=model_cache_dir,
model_dir=model_dir,
)
e = time.time()
logger.info(f"done. It took {round(e-t,2)} seconds.")
if task == "translate":
tgt_language = "en" # Whisper translates into English
else:
tgt_language = lan # Whisper transcribes in this language
# Create the tokenizer
if buffer_trimming == "sentence":
tokenizer = create_tokenizer(tgt_language)
else:
tokenizer = None
warmup_asr(asr, warmup_file)
asr.confidence_validation = confidence_validation
asr.tokenizer = tokenizer
asr.buffer_trimming = buffer_trimming
asr.buffer_trimming_sec = buffer_trimming_sec
return asr