mirror of
https://github.com/QuentinFuxa/WhisperLiveKit.git
synced 2026-03-07 14:23:18 +00:00
Test suite covering: - metrics.py: WER computation, timestamp accuracy, text normalization - config.py: defaults, .en model detection, policy aliases, from_namespace - timed_objects.py: ASRToken, Silence, Transcript, Segment, FrontData - hypothesis_buffer.py: insert, flush, LCP matching, pop_committed - silence_handling.py: state machine, double-counting regression test - audio_processor.py: async pipeline with MockOnlineProcessor All tests run in ~1.3s without downloading any ASR models. Add pytest and pytest-asyncio as optional test dependencies. Update .gitignore to allow tests/ directory.
210 lines
6.7 KiB
Python
210 lines
6.7 KiB
Python
"""Tests for AudioProcessor pipeline with mocked ASR backends.
|
|
|
|
These tests verify the async audio processing pipeline works correctly
|
|
without requiring any real ASR models to be loaded.
|
|
"""
|
|
|
|
import asyncio
|
|
from types import SimpleNamespace
|
|
from unittest.mock import patch
|
|
|
|
import numpy as np
|
|
import pytest
|
|
|
|
from whisperlivekit.timed_objects import ASRToken, Transcript
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Mock ASR components
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class MockASR:
|
|
"""Mock ASR model holder."""
|
|
sep = " "
|
|
SAMPLING_RATE = 16000
|
|
|
|
def __init__(self):
|
|
self.transcribe_kargs = {}
|
|
self.original_language = "en"
|
|
self.backend_choice = "mock"
|
|
|
|
def transcribe(self, audio):
|
|
return None
|
|
|
|
|
|
class MockOnlineProcessor:
|
|
"""Mock online processor that returns canned tokens."""
|
|
SAMPLING_RATE = 16000
|
|
|
|
def __init__(self, asr=None):
|
|
self.asr = asr or MockASR()
|
|
self.audio_buffer = np.array([], dtype=np.float32)
|
|
self.end = 0.0
|
|
self._call_count = 0
|
|
self._finished = False
|
|
|
|
def insert_audio_chunk(self, audio, audio_stream_end_time):
|
|
self.audio_buffer = np.append(self.audio_buffer, audio)
|
|
self.end = audio_stream_end_time
|
|
|
|
def process_iter(self, is_last=False):
|
|
self._call_count += 1
|
|
# Emit a token on every call when we have audio
|
|
if len(self.audio_buffer) > 0:
|
|
t = self._call_count * 0.5
|
|
return [ASRToken(start=t, end=t + 0.5, text=f"word{self._call_count}")], self.end
|
|
return [], self.end
|
|
|
|
def get_buffer(self):
|
|
return Transcript(start=None, end=None, text="")
|
|
|
|
def start_silence(self):
|
|
return [], self.end
|
|
|
|
def end_silence(self, silence_duration, offset):
|
|
pass
|
|
|
|
def new_speaker(self, change_speaker):
|
|
pass
|
|
|
|
def finish(self):
|
|
self._finished = True
|
|
return [], self.end
|
|
|
|
def warmup(self, audio, init_prompt=""):
|
|
pass
|
|
|
|
|
|
def _make_pcm_bytes(duration_s=0.1, sample_rate=16000):
|
|
"""Generate silent PCM s16le bytes."""
|
|
n_samples = int(duration_s * sample_rate)
|
|
audio = np.zeros(n_samples, dtype=np.float32)
|
|
return (audio * 32768).clip(-32768, 32767).astype(np.int16).tobytes()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.fixture
|
|
def mock_engine():
|
|
"""Create a mock TranscriptionEngine-like object."""
|
|
engine = SimpleNamespace(
|
|
asr=MockASR(),
|
|
diarization_model=None,
|
|
translation_model=None,
|
|
args=SimpleNamespace(
|
|
diarization=False,
|
|
transcription=True,
|
|
target_language="",
|
|
vac=False,
|
|
vac_chunk_size=0.04,
|
|
min_chunk_size=0.1,
|
|
pcm_input=True,
|
|
punctuation_split=False,
|
|
backend="mock",
|
|
backend_policy="localagreement",
|
|
vad=True,
|
|
model_size="base",
|
|
lan="en",
|
|
),
|
|
)
|
|
return engine
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestPCMConversion:
|
|
"""Test PCM byte conversion without needing the full pipeline."""
|
|
|
|
def test_s16le_roundtrip(self):
|
|
"""Convert float32 → s16le → float32 and verify approximate roundtrip."""
|
|
original = np.array([0.0, 0.5, -0.5, 1.0, -1.0], dtype=np.float32)
|
|
s16 = (original * 32768).clip(-32768, 32767).astype(np.int16)
|
|
pcm_bytes = s16.tobytes()
|
|
# Direct numpy conversion (same logic as AudioProcessor.convert_pcm_to_float)
|
|
recovered = np.frombuffer(pcm_bytes, dtype=np.int16).astype(np.float32) / 32768.0
|
|
|
|
np.testing.assert_allclose(recovered, original, atol=1 / 32768)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
class TestPipelineBasics:
|
|
async def test_feed_audio_and_get_responses(self, mock_engine):
|
|
"""Feed audio through the pipeline and verify we get responses."""
|
|
from whisperlivekit.audio_processor import AudioProcessor
|
|
|
|
with patch("whisperlivekit.audio_processor.online_factory", return_value=MockOnlineProcessor()):
|
|
processor = AudioProcessor(transcription_engine=mock_engine)
|
|
results_gen = await processor.create_tasks()
|
|
|
|
responses = []
|
|
|
|
async def collect():
|
|
async for resp in results_gen:
|
|
responses.append(resp)
|
|
|
|
task = asyncio.create_task(collect())
|
|
|
|
# Feed 2 seconds of audio in 100ms chunks
|
|
for _ in range(20):
|
|
await processor.process_audio(_make_pcm_bytes(0.1))
|
|
|
|
# Signal EOF
|
|
await processor.process_audio(None)
|
|
|
|
await asyncio.wait_for(task, timeout=10.0)
|
|
await processor.cleanup()
|
|
|
|
# We should have gotten at least one response
|
|
assert len(responses) > 0
|
|
|
|
async def test_eof_terminates_pipeline(self, mock_engine):
|
|
"""Sending None (EOF) should cleanly terminate the pipeline."""
|
|
from whisperlivekit.audio_processor import AudioProcessor
|
|
|
|
with patch("whisperlivekit.audio_processor.online_factory", return_value=MockOnlineProcessor()):
|
|
processor = AudioProcessor(transcription_engine=mock_engine)
|
|
results_gen = await processor.create_tasks()
|
|
|
|
responses = []
|
|
|
|
async def collect():
|
|
async for resp in results_gen:
|
|
responses.append(resp)
|
|
|
|
task = asyncio.create_task(collect())
|
|
|
|
# Send a small amount of audio then EOF
|
|
await processor.process_audio(_make_pcm_bytes(0.5))
|
|
await processor.process_audio(None)
|
|
|
|
await asyncio.wait_for(task, timeout=10.0)
|
|
await processor.cleanup()
|
|
|
|
# Pipeline should have terminated without error
|
|
assert task.done()
|
|
|
|
async def test_empty_audio_no_crash(self, mock_engine):
|
|
"""Sending EOF immediately (no audio) should not crash."""
|
|
from whisperlivekit.audio_processor import AudioProcessor
|
|
|
|
with patch("whisperlivekit.audio_processor.online_factory", return_value=MockOnlineProcessor()):
|
|
processor = AudioProcessor(transcription_engine=mock_engine)
|
|
results_gen = await processor.create_tasks()
|
|
|
|
responses = []
|
|
|
|
async def collect():
|
|
async for resp in results_gen:
|
|
responses.append(resp)
|
|
|
|
task = asyncio.create_task(collect())
|
|
await processor.process_audio(None)
|
|
|
|
await asyncio.wait_for(task, timeout=10.0)
|
|
await processor.cleanup()
|
|
assert task.done()
|