mirror of
https://github.com/QuentinFuxa/WhisperLiveKit.git
synced 2026-03-21 16:40:35 +00:00
Compare commits
8 Commits
0.2.9
...
ScriptProc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4d7c487614 | ||
|
|
2a27d2030a | ||
|
|
cd160caaa1 | ||
|
|
d27b5eb23e | ||
|
|
f9d704a900 | ||
|
|
2f6e00f512 | ||
|
|
5aa312e437 | ||
|
|
ebaf36a8be |
@@ -4,9 +4,8 @@ from time import time, sleep
|
|||||||
import math
|
import math
|
||||||
import logging
|
import logging
|
||||||
import traceback
|
import traceback
|
||||||
from whisperlivekit.timed_objects import ASRToken, Silence, Line
|
from whisperlivekit.timed_objects import ASRToken, Silence, Line, FrontData, State
|
||||||
from whisperlivekit.core import TranscriptionEngine, online_factory, online_diarization_factory, online_translation_factory
|
from whisperlivekit.core import TranscriptionEngine, online_factory, online_diarization_factory, online_translation_factory
|
||||||
from whisperlivekit.ffmpeg_manager import FFmpegManager, FFmpegState
|
|
||||||
from whisperlivekit.silero_vad_iterator import FixedVADIterator
|
from whisperlivekit.silero_vad_iterator import FixedVADIterator
|
||||||
from whisperlivekit.results_formater import format_output
|
from whisperlivekit.results_formater import format_output
|
||||||
# Set up logging once
|
# Set up logging once
|
||||||
@@ -49,10 +48,7 @@ class AudioProcessor:
|
|||||||
self.bytes_per_sample = 2
|
self.bytes_per_sample = 2
|
||||||
self.bytes_per_sec = self.samples_per_sec * self.bytes_per_sample
|
self.bytes_per_sec = self.samples_per_sec * self.bytes_per_sample
|
||||||
self.max_bytes_per_sec = 32000 * 5 # 5 seconds of audio at 32 kHz
|
self.max_bytes_per_sec = 32000 * 5 # 5 seconds of audio at 32 kHz
|
||||||
self.last_ffmpeg_activity = time()
|
self.is_pcm_input = True
|
||||||
self.ffmpeg_health_check_interval = 5
|
|
||||||
self.ffmpeg_max_idle_time = 10
|
|
||||||
self.is_pcm_input = self.args.pcm_input
|
|
||||||
self.debug = False
|
self.debug = False
|
||||||
|
|
||||||
# State management
|
# State management
|
||||||
@@ -68,7 +64,7 @@ class AudioProcessor:
|
|||||||
self.lock = asyncio.Lock()
|
self.lock = asyncio.Lock()
|
||||||
self.beg_loop = None #to deal with a potential little lag at the websocket initialization, this is now set in process_audio
|
self.beg_loop = None #to deal with a potential little lag at the websocket initialization, this is now set in process_audio
|
||||||
self.sep = " " # Default separator
|
self.sep = " " # Default separator
|
||||||
self.last_response_content = ""
|
self.last_response_content = FrontData()
|
||||||
|
|
||||||
# Models and processing
|
# Models and processing
|
||||||
self.asr = models.asr
|
self.asr = models.asr
|
||||||
@@ -79,18 +75,6 @@ class AudioProcessor:
|
|||||||
else:
|
else:
|
||||||
self.vac = None
|
self.vac = None
|
||||||
|
|
||||||
self.ffmpeg_manager = FFmpegManager(
|
|
||||||
sample_rate=self.sample_rate,
|
|
||||||
channels=self.channels
|
|
||||||
)
|
|
||||||
|
|
||||||
async def handle_ffmpeg_error(error_type: str):
|
|
||||||
logger.error(f"FFmpeg error: {error_type}")
|
|
||||||
self._ffmpeg_error = error_type
|
|
||||||
|
|
||||||
self.ffmpeg_manager.on_error_callback = handle_ffmpeg_error
|
|
||||||
self._ffmpeg_error = None
|
|
||||||
|
|
||||||
self.transcription_queue = asyncio.Queue() if self.args.transcription else None
|
self.transcription_queue = asyncio.Queue() if self.args.transcription else None
|
||||||
self.diarization_queue = asyncio.Queue() if self.args.diarization else None
|
self.diarization_queue = asyncio.Queue() if self.args.diarization else None
|
||||||
self.translation_queue = asyncio.Queue() if self.args.target_language else None
|
self.translation_queue = asyncio.Queue() if self.args.target_language else None
|
||||||
@@ -98,12 +82,12 @@ class AudioProcessor:
|
|||||||
|
|
||||||
self.transcription_task = None
|
self.transcription_task = None
|
||||||
self.diarization_task = None
|
self.diarization_task = None
|
||||||
self.ffmpeg_reader_task = None
|
|
||||||
self.watchdog_task = None
|
self.watchdog_task = None
|
||||||
self.all_tasks_for_cleanup = []
|
self.all_tasks_for_cleanup = []
|
||||||
|
|
||||||
if self.args.transcription:
|
if self.args.transcription:
|
||||||
self.online = online_factory(self.args, models.asr, models.tokenizer)
|
self.online = online_factory(self.args, models.asr, models.tokenizer)
|
||||||
|
self.sep = self.online.asr.sep
|
||||||
if self.args.diarization:
|
if self.args.diarization:
|
||||||
self.diarization = online_diarization_factory(self.args, models.diarization_model)
|
self.diarization = online_diarization_factory(self.args, models.diarization_model)
|
||||||
if self.args.target_language:
|
if self.args.target_language:
|
||||||
@@ -113,13 +97,12 @@ class AudioProcessor:
|
|||||||
"""Convert PCM buffer in s16le format to normalized NumPy array."""
|
"""Convert PCM buffer in s16le format to normalized NumPy array."""
|
||||||
return np.frombuffer(pcm_buffer, dtype=np.int16).astype(np.float32) / 32768.0
|
return np.frombuffer(pcm_buffer, dtype=np.int16).astype(np.float32) / 32768.0
|
||||||
|
|
||||||
async def update_transcription(self, new_tokens, buffer, end_buffer, sep):
|
async def update_transcription(self, new_tokens, buffer, end_buffer):
|
||||||
"""Thread-safe update of transcription with new data."""
|
"""Thread-safe update of transcription with new data."""
|
||||||
async with self.lock:
|
async with self.lock:
|
||||||
self.tokens.extend(new_tokens)
|
self.tokens.extend(new_tokens)
|
||||||
self.buffer_transcription = buffer
|
self.buffer_transcription = buffer
|
||||||
self.end_buffer = end_buffer
|
self.end_buffer = end_buffer
|
||||||
self.sep = sep
|
|
||||||
|
|
||||||
async def update_diarization(self, end_attributed_speaker, buffer_diarization=""):
|
async def update_diarization(self, end_attributed_speaker, buffer_diarization=""):
|
||||||
"""Thread-safe update of diarization with new data."""
|
"""Thread-safe update of diarization with new data."""
|
||||||
@@ -152,17 +135,16 @@ class AudioProcessor:
|
|||||||
latest_end = max(self.end_buffer, self.tokens[-1].end if self.tokens else 0)
|
latest_end = max(self.end_buffer, self.tokens[-1].end if self.tokens else 0)
|
||||||
remaining_diarization = max(0, round(latest_end - self.end_attributed_speaker, 1))
|
remaining_diarization = max(0, round(latest_end - self.end_attributed_speaker, 1))
|
||||||
|
|
||||||
return {
|
return State(
|
||||||
"tokens": self.tokens.copy(),
|
tokens=self.tokens.copy(),
|
||||||
"translated_segments": self.translated_segments.copy(),
|
translated_segments=self.translated_segments.copy(),
|
||||||
"buffer_transcription": self.buffer_transcription,
|
buffer_transcription=self.buffer_transcription,
|
||||||
"buffer_diarization": self.buffer_diarization,
|
buffer_diarization=self.buffer_diarization,
|
||||||
"end_buffer": self.end_buffer,
|
end_buffer=self.end_buffer,
|
||||||
"end_attributed_speaker": self.end_attributed_speaker,
|
end_attributed_speaker=self.end_attributed_speaker,
|
||||||
"sep": self.sep,
|
remaining_time_transcription=remaining_transcription,
|
||||||
"remaining_time_transcription": remaining_transcription,
|
remaining_time_diarization=remaining_diarization
|
||||||
"remaining_time_diarization": remaining_diarization
|
)
|
||||||
}
|
|
||||||
|
|
||||||
async def reset(self):
|
async def reset(self):
|
||||||
"""Reset all state variables to initial values."""
|
"""Reset all state variables to initial values."""
|
||||||
@@ -173,70 +155,8 @@ class AudioProcessor:
|
|||||||
self.end_buffer = self.end_attributed_speaker = 0
|
self.end_buffer = self.end_attributed_speaker = 0
|
||||||
self.beg_loop = time()
|
self.beg_loop = time()
|
||||||
|
|
||||||
async def ffmpeg_stdout_reader(self):
|
|
||||||
"""Read audio data from FFmpeg stdout and process it."""
|
|
||||||
beg = time()
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
# Check if FFmpeg is running
|
|
||||||
state = await self.ffmpeg_manager.get_state()
|
|
||||||
if state == FFmpegState.FAILED:
|
|
||||||
logger.error("FFmpeg is in FAILED state, cannot read data")
|
|
||||||
break
|
|
||||||
elif state == FFmpegState.STOPPED:
|
|
||||||
logger.info("FFmpeg is stopped")
|
|
||||||
break
|
|
||||||
elif state != FFmpegState.RUNNING:
|
|
||||||
logger.warning(f"FFmpeg is in {state} state, waiting...")
|
|
||||||
await asyncio.sleep(0.5)
|
|
||||||
continue
|
|
||||||
|
|
||||||
current_time = time()
|
|
||||||
elapsed_time = math.floor((current_time - beg) * 10) / 10
|
|
||||||
buffer_size = max(int(32000 * elapsed_time), 4096)
|
|
||||||
beg = current_time
|
|
||||||
|
|
||||||
chunk = await self.ffmpeg_manager.read_data(buffer_size)
|
|
||||||
|
|
||||||
if not chunk:
|
|
||||||
if self.is_stopping:
|
|
||||||
logger.info("FFmpeg stdout closed, stopping.")
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
# No data available, but not stopping - FFmpeg might be restarting
|
|
||||||
await asyncio.sleep(0.1)
|
|
||||||
continue
|
|
||||||
|
|
||||||
self.pcm_buffer.extend(chunk)
|
|
||||||
await self.handle_pcm_data()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"Exception in ffmpeg_stdout_reader: {e}")
|
|
||||||
logger.warning(f"Traceback: {traceback.format_exc()}")
|
|
||||||
# Try to recover by waiting a bit
|
|
||||||
await asyncio.sleep(1)
|
|
||||||
|
|
||||||
# Check if we should exit
|
|
||||||
if self.is_stopping:
|
|
||||||
break
|
|
||||||
|
|
||||||
logger.info("FFmpeg stdout processing finished. Signaling downstream processors.")
|
|
||||||
if self.args.transcription and self.transcription_queue:
|
|
||||||
await self.transcription_queue.put(SENTINEL)
|
|
||||||
logger.debug("Sentinel put into transcription_queue.")
|
|
||||||
if self.args.diarization and self.diarization_queue:
|
|
||||||
await self.diarization_queue.put(SENTINEL)
|
|
||||||
logger.debug("Sentinel put into diarization_queue.")
|
|
||||||
if self.args.target_language and self.translation_queue:
|
|
||||||
await self.translation_queue.put(SENTINEL)
|
|
||||||
|
|
||||||
|
|
||||||
async def transcription_processor(self):
|
async def transcription_processor(self):
|
||||||
"""Process audio chunks for transcription."""
|
"""Process audio chunks for transcription."""
|
||||||
self.sep = self.online.asr.sep
|
|
||||||
cumulative_pcm_duration_stream_time = 0.0
|
cumulative_pcm_duration_stream_time = 0.0
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
@@ -276,7 +196,7 @@ class AudioProcessor:
|
|||||||
stream_time_end_of_current_pcm = cumulative_pcm_duration_stream_time
|
stream_time_end_of_current_pcm = cumulative_pcm_duration_stream_time
|
||||||
|
|
||||||
self.online.insert_audio_chunk(pcm_array, stream_time_end_of_current_pcm)
|
self.online.insert_audio_chunk(pcm_array, stream_time_end_of_current_pcm)
|
||||||
new_tokens, current_audio_processed_upto = self.online.process_iter()
|
new_tokens, current_audio_processed_upto = await asyncio.to_thread(self.online.process_iter)
|
||||||
|
|
||||||
# Get buffer information
|
# Get buffer information
|
||||||
_buffer_transcript_obj = self.online.get_buffer()
|
_buffer_transcript_obj = self.online.get_buffer()
|
||||||
@@ -300,7 +220,7 @@ class AudioProcessor:
|
|||||||
new_end_buffer = max(candidate_end_times)
|
new_end_buffer = max(candidate_end_times)
|
||||||
|
|
||||||
await self.update_transcription(
|
await self.update_transcription(
|
||||||
new_tokens, buffer_text, new_end_buffer, self.sep
|
new_tokens, buffer_text, new_end_buffer
|
||||||
)
|
)
|
||||||
|
|
||||||
if new_tokens and self.args.target_language and self.translation_queue:
|
if new_tokens and self.args.target_language and self.translation_queue:
|
||||||
@@ -314,6 +234,14 @@ class AudioProcessor:
|
|||||||
logger.warning(f"Traceback: {traceback.format_exc()}")
|
logger.warning(f"Traceback: {traceback.format_exc()}")
|
||||||
if 'pcm_array' in locals() and pcm_array is not SENTINEL : # Check if pcm_array was assigned from queue
|
if 'pcm_array' in locals() and pcm_array is not SENTINEL : # Check if pcm_array was assigned from queue
|
||||||
self.transcription_queue.task_done()
|
self.transcription_queue.task_done()
|
||||||
|
|
||||||
|
if self.is_stopping:
|
||||||
|
logger.info("Transcription processor finishing due to stopping flag.")
|
||||||
|
if self.diarization_queue:
|
||||||
|
await self.diarization_queue.put(SENTINEL)
|
||||||
|
if self.translation_queue:
|
||||||
|
await self.translation_queue.put(SENTINEL)
|
||||||
|
|
||||||
logger.info("Transcription processor task finished.")
|
logger.info("Transcription processor task finished.")
|
||||||
|
|
||||||
|
|
||||||
@@ -385,7 +313,7 @@ class AudioProcessor:
|
|||||||
tokens_to_process.append(additional_token)
|
tokens_to_process.append(additional_token)
|
||||||
if tokens_to_process:
|
if tokens_to_process:
|
||||||
online_translation.insert_tokens(tokens_to_process)
|
online_translation.insert_tokens(tokens_to_process)
|
||||||
self.translated_segments = online_translation.process()
|
self.translated_segments = await asyncio.to_thread(online_translation.process)
|
||||||
|
|
||||||
self.translation_queue.task_done()
|
self.translation_queue.task_done()
|
||||||
for _ in additional_tokens:
|
for _ in additional_tokens:
|
||||||
@@ -407,39 +335,16 @@ class AudioProcessor:
|
|||||||
|
|
||||||
async def results_formatter(self):
|
async def results_formatter(self):
|
||||||
"""Format processing results for output."""
|
"""Format processing results for output."""
|
||||||
last_sent_trans = None
|
|
||||||
last_sent_diar = None
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
ffmpeg_state = await self.ffmpeg_manager.get_state()
|
|
||||||
if ffmpeg_state == FFmpegState.FAILED and self._ffmpeg_error:
|
|
||||||
yield {
|
|
||||||
"status": "error",
|
|
||||||
"error": f"FFmpeg error: {self._ffmpeg_error}",
|
|
||||||
"lines": [],
|
|
||||||
"buffer_transcription": "",
|
|
||||||
"buffer_diarization": "",
|
|
||||||
"remaining_time_transcription": 0,
|
|
||||||
"remaining_time_diarization": 0
|
|
||||||
}
|
|
||||||
self._ffmpeg_error = None
|
|
||||||
await asyncio.sleep(1)
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Get current state
|
# Get current state
|
||||||
state = await self.get_current_state()
|
state = await self.get_current_state()
|
||||||
tokens = state["tokens"]
|
|
||||||
buffer_transcription = state["buffer_transcription"]
|
|
||||||
buffer_diarization = state["buffer_diarization"]
|
|
||||||
end_attributed_speaker = state["end_attributed_speaker"]
|
|
||||||
sep = state["sep"]
|
|
||||||
|
|
||||||
# Add dummy tokens if needed
|
# Add dummy tokens if needed
|
||||||
if (not tokens or tokens[-1].is_dummy) and not self.args.transcription and self.args.diarization:
|
if (not state.tokens or state.tokens[-1].is_dummy) and not self.args.transcription and self.args.diarization:
|
||||||
await self.add_dummy_token()
|
await self.add_dummy_token()
|
||||||
sleep(0.5)
|
sleep(0.5)
|
||||||
state = await self.get_current_state()
|
state = await self.get_current_state()
|
||||||
tokens = state["tokens"]
|
|
||||||
|
|
||||||
# Format output
|
# Format output
|
||||||
lines, undiarized_text, buffer_transcription, buffer_diarization = format_output(
|
lines, undiarized_text, buffer_transcription, buffer_diarization = format_output(
|
||||||
@@ -447,18 +352,19 @@ class AudioProcessor:
|
|||||||
self.silence,
|
self.silence,
|
||||||
current_time = time() - self.beg_loop if self.beg_loop else None,
|
current_time = time() - self.beg_loop if self.beg_loop else None,
|
||||||
args = self.args,
|
args = self.args,
|
||||||
debug = self.debug
|
debug = self.debug,
|
||||||
|
sep=self.sep
|
||||||
)
|
)
|
||||||
# Handle undiarized text
|
# Handle undiarized text
|
||||||
if undiarized_text:
|
if undiarized_text:
|
||||||
combined = sep.join(undiarized_text)
|
combined = self.sep.join(undiarized_text)
|
||||||
if buffer_transcription:
|
if buffer_transcription:
|
||||||
combined += sep
|
combined += self.sep
|
||||||
await self.update_diarization(end_attributed_speaker, combined)
|
await self.update_diarization(state.end_attributed_speaker, combined)
|
||||||
buffer_diarization = combined
|
buffer_diarization = combined
|
||||||
|
|
||||||
response_status = "active_transcription"
|
response_status = "active_transcription"
|
||||||
if not tokens and not buffer_transcription and not buffer_diarization:
|
if not state.tokens and not buffer_transcription and not buffer_diarization:
|
||||||
response_status = "no_audio_detected"
|
response_status = "no_audio_detected"
|
||||||
lines = []
|
lines = []
|
||||||
elif response_status == "active_transcription" and not lines:
|
elif response_status == "active_transcription" and not lines:
|
||||||
@@ -468,32 +374,19 @@ class AudioProcessor:
|
|||||||
end=state.get("end_buffer", 0)
|
end=state.get("end_buffer", 0)
|
||||||
)]
|
)]
|
||||||
|
|
||||||
response = {
|
response = FrontData(
|
||||||
"status": response_status,
|
status=response_status,
|
||||||
"lines": [line.to_dict() for line in lines],
|
lines=lines,
|
||||||
"buffer_transcription": buffer_transcription,
|
buffer_transcription=buffer_transcription,
|
||||||
"buffer_diarization": buffer_diarization,
|
buffer_diarization=buffer_diarization,
|
||||||
"remaining_time_transcription": state["remaining_time_transcription"],
|
remaining_time_transcription=state.remaining_time_transcription,
|
||||||
"remaining_time_diarization": state["remaining_time_diarization"] if self.args.diarization else 0
|
remaining_time_diarization=state.remaining_time_diarization if self.args.diarization else 0
|
||||||
}
|
|
||||||
|
|
||||||
current_response_signature = f"{response_status} | " + \
|
|
||||||
' '.join([f"{line.speaker} {line.text}" for line in lines]) + \
|
|
||||||
f" | {buffer_transcription} | {buffer_diarization}"
|
|
||||||
|
|
||||||
trans = state["remaining_time_transcription"]
|
|
||||||
diar = state["remaining_time_diarization"]
|
|
||||||
should_push = (
|
|
||||||
current_response_signature != self.last_response_content
|
|
||||||
or last_sent_trans is None
|
|
||||||
or round(trans, 1) != round(last_sent_trans, 1)
|
|
||||||
or round(diar, 1) != round(last_sent_diar, 1)
|
|
||||||
)
|
)
|
||||||
if should_push and (lines or buffer_transcription or buffer_diarization or response_status == "no_audio_detected" or trans > 0 or diar > 0):
|
|
||||||
|
should_push = (response != self.last_response_content)
|
||||||
|
if should_push and (lines or buffer_transcription or buffer_diarization or response_status == "no_audio_detected"):
|
||||||
yield response
|
yield response
|
||||||
self.last_response_content = current_response_signature
|
self.last_response_content = response
|
||||||
last_sent_trans = trans
|
|
||||||
last_sent_diar = diar
|
|
||||||
|
|
||||||
# Check for termination condition
|
# Check for termination condition
|
||||||
if self.is_stopping:
|
if self.is_stopping:
|
||||||
@@ -507,33 +400,18 @@ class AudioProcessor:
|
|||||||
logger.info("Results formatter: All upstream processors are done and in stopping state. Terminating.")
|
logger.info("Results formatter: All upstream processors are done and in stopping state. Terminating.")
|
||||||
return
|
return
|
||||||
|
|
||||||
await asyncio.sleep(0.1) # Avoid overwhelming the client
|
await asyncio.sleep(0.05)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Exception in results_formatter: {e}")
|
logger.warning(f"Exception in results_formatter: {e}")
|
||||||
logger.warning(f"Traceback: {traceback.format_exc()}")
|
logger.warning(f"Traceback: {traceback.format_exc()}")
|
||||||
await asyncio.sleep(0.5) # Back off on error
|
await asyncio.sleep(0.5)
|
||||||
|
|
||||||
async def create_tasks(self):
|
async def create_tasks(self):
|
||||||
"""Create and start processing tasks."""
|
"""Create and start processing tasks."""
|
||||||
self.all_tasks_for_cleanup = []
|
self.all_tasks_for_cleanup = []
|
||||||
processing_tasks_for_watchdog = []
|
processing_tasks_for_watchdog = []
|
||||||
|
|
||||||
success = await self.ffmpeg_manager.start()
|
|
||||||
if not success:
|
|
||||||
logger.error("Failed to start FFmpeg manager")
|
|
||||||
async def error_generator():
|
|
||||||
yield {
|
|
||||||
"status": "error",
|
|
||||||
"error": "FFmpeg failed to start. Please check that FFmpeg is installed.",
|
|
||||||
"lines": [],
|
|
||||||
"buffer_transcription": "",
|
|
||||||
"buffer_diarization": "",
|
|
||||||
"remaining_time_transcription": 0,
|
|
||||||
"remaining_time_diarization": 0
|
|
||||||
}
|
|
||||||
return error_generator()
|
|
||||||
|
|
||||||
if self.args.transcription and self.online:
|
if self.args.transcription and self.online:
|
||||||
self.transcription_task = asyncio.create_task(self.transcription_processor())
|
self.transcription_task = asyncio.create_task(self.transcription_processor())
|
||||||
self.all_tasks_for_cleanup.append(self.transcription_task)
|
self.all_tasks_for_cleanup.append(self.transcription_task)
|
||||||
@@ -549,10 +427,6 @@ class AudioProcessor:
|
|||||||
self.all_tasks_for_cleanup.append(self.translation_task)
|
self.all_tasks_for_cleanup.append(self.translation_task)
|
||||||
processing_tasks_for_watchdog.append(self.translation_task)
|
processing_tasks_for_watchdog.append(self.translation_task)
|
||||||
|
|
||||||
self.ffmpeg_reader_task = asyncio.create_task(self.ffmpeg_stdout_reader())
|
|
||||||
self.all_tasks_for_cleanup.append(self.ffmpeg_reader_task)
|
|
||||||
processing_tasks_for_watchdog.append(self.ffmpeg_reader_task)
|
|
||||||
|
|
||||||
# Monitor overall system health
|
# Monitor overall system health
|
||||||
self.watchdog_task = asyncio.create_task(self.watchdog(processing_tasks_for_watchdog))
|
self.watchdog_task = asyncio.create_task(self.watchdog(processing_tasks_for_watchdog))
|
||||||
self.all_tasks_for_cleanup.append(self.watchdog_task)
|
self.all_tasks_for_cleanup.append(self.watchdog_task)
|
||||||
@@ -574,15 +448,6 @@ class AudioProcessor:
|
|||||||
else:
|
else:
|
||||||
logger.info(f"{task_name} completed normally.")
|
logger.info(f"{task_name} completed normally.")
|
||||||
|
|
||||||
# Check FFmpeg status through the manager
|
|
||||||
ffmpeg_state = await self.ffmpeg_manager.get_state()
|
|
||||||
if ffmpeg_state == FFmpegState.FAILED:
|
|
||||||
logger.error("FFmpeg is in FAILED state, notifying results formatter")
|
|
||||||
# FFmpeg manager will handle its own recovery
|
|
||||||
elif ffmpeg_state == FFmpegState.STOPPED and not self.is_stopping:
|
|
||||||
logger.warning("FFmpeg unexpectedly stopped, attempting restart")
|
|
||||||
await self.ffmpeg_manager.restart()
|
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
logger.info("Watchdog task cancelled.")
|
logger.info("Watchdog task cancelled.")
|
||||||
break
|
break
|
||||||
@@ -601,8 +466,6 @@ class AudioProcessor:
|
|||||||
if created_tasks:
|
if created_tasks:
|
||||||
await asyncio.gather(*created_tasks, return_exceptions=True)
|
await asyncio.gather(*created_tasks, return_exceptions=True)
|
||||||
logger.info("All processing tasks cancelled or finished.")
|
logger.info("All processing tasks cancelled or finished.")
|
||||||
await self.ffmpeg_manager.stop()
|
|
||||||
logger.info("FFmpeg manager stopped.")
|
|
||||||
if self.args.diarization and hasattr(self, 'diarization') and hasattr(self.diarization, 'close'):
|
if self.args.diarization and hasattr(self, 'diarization') and hasattr(self.diarization, 'close'):
|
||||||
self.diarization.close()
|
self.diarization.close()
|
||||||
logger.info("AudioProcessor cleanup complete.")
|
logger.info("AudioProcessor cleanup complete.")
|
||||||
@@ -617,8 +480,10 @@ class AudioProcessor:
|
|||||||
if not message:
|
if not message:
|
||||||
logger.info("Empty audio message received, initiating stop sequence.")
|
logger.info("Empty audio message received, initiating stop sequence.")
|
||||||
self.is_stopping = True
|
self.is_stopping = True
|
||||||
# Signal FFmpeg manager to stop accepting data
|
|
||||||
await self.ffmpeg_manager.stop()
|
if self.transcription_queue:
|
||||||
|
await self.transcription_queue.put(SENTINEL)
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
||||||
if self.is_stopping:
|
if self.is_stopping:
|
||||||
@@ -628,14 +493,6 @@ class AudioProcessor:
|
|||||||
if self.is_pcm_input:
|
if self.is_pcm_input:
|
||||||
self.pcm_buffer.extend(message)
|
self.pcm_buffer.extend(message)
|
||||||
await self.handle_pcm_data()
|
await self.handle_pcm_data()
|
||||||
else:
|
|
||||||
success = await self.ffmpeg_manager.write_data(message)
|
|
||||||
if not success:
|
|
||||||
ffmpeg_state = await self.ffmpeg_manager.get_state()
|
|
||||||
if ffmpeg_state == FFmpegState.FAILED:
|
|
||||||
logger.error("FFmpeg is in FAILED state, cannot process audio")
|
|
||||||
else:
|
|
||||||
logger.warning("Failed to write audio data to FFmpeg")
|
|
||||||
|
|
||||||
async def handle_pcm_data(self):
|
async def handle_pcm_data(self):
|
||||||
# Process when enough data
|
# Process when enough data
|
||||||
|
|||||||
@@ -54,7 +54,7 @@ async def handle_websocket_results(websocket, results_generator):
|
|||||||
"""Consumes results from the audio processor and sends them via WebSocket."""
|
"""Consumes results from the audio processor and sends them via WebSocket."""
|
||||||
try:
|
try:
|
||||||
async for response in results_generator:
|
async for response in results_generator:
|
||||||
await websocket.send_json(response)
|
await websocket.send_json(response.to_dict())
|
||||||
# when the results_generator finishes it means all audio has been processed
|
# when the results_generator finishes it means all audio has been processed
|
||||||
logger.info("Results generator finished. Sending 'ready_to_stop' to client.")
|
logger.info("Results generator finished. Sending 'ready_to_stop' to client.")
|
||||||
await websocket.send_json({"type": "ready_to_stop"})
|
await websocket.send_json({"type": "ready_to_stop"})
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ try:
|
|||||||
except ImportError:
|
except ImportError:
|
||||||
from .whisper_streaming_custom.whisper_online import backend_factory
|
from .whisper_streaming_custom.whisper_online import backend_factory
|
||||||
from .whisper_streaming_custom.online_asr import OnlineASRProcessor
|
from .whisper_streaming_custom.online_asr import OnlineASRProcessor
|
||||||
from whisperlivekit.warmup import warmup_asr, warmup_online
|
from whisperlivekit.warmup import warmup_asr
|
||||||
from argparse import Namespace
|
from argparse import Namespace
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
@@ -155,7 +155,6 @@ def online_factory(args, asr, tokenizer, logfile=sys.stderr):
|
|||||||
asr,
|
asr,
|
||||||
logfile=logfile,
|
logfile=logfile,
|
||||||
)
|
)
|
||||||
# warmup_online(online, args.warmup_file)
|
|
||||||
else:
|
else:
|
||||||
online = OnlineASRProcessor(
|
online = OnlineASRProcessor(
|
||||||
asr,
|
asr,
|
||||||
|
|||||||
@@ -1,193 +0,0 @@
|
|||||||
import asyncio
|
|
||||||
import logging
|
|
||||||
from enum import Enum
|
|
||||||
from typing import Optional, Callable
|
|
||||||
import contextlib
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
|
||||||
logging.basicConfig(level=logging.INFO)
|
|
||||||
|
|
||||||
ERROR_INSTALL_INSTRUCTIONS = """
|
|
||||||
FFmpeg is not installed or not found in your system's PATH.
|
|
||||||
Please install FFmpeg to enable audio processing.
|
|
||||||
|
|
||||||
Installation instructions:
|
|
||||||
|
|
||||||
# Ubuntu/Debian:
|
|
||||||
sudo apt update && sudo apt install ffmpeg
|
|
||||||
|
|
||||||
# macOS (using Homebrew):
|
|
||||||
brew install ffmpeg
|
|
||||||
|
|
||||||
# Windows:
|
|
||||||
# 1. Download the latest static build from https://ffmpeg.org/download.html
|
|
||||||
# 2. Extract the archive (e.g., to C:\\FFmpeg).
|
|
||||||
# 3. Add the 'bin' directory (e.g., C:\\FFmpeg\\bin) to your system's PATH environment variable.
|
|
||||||
|
|
||||||
After installation, please restart the application.
|
|
||||||
"""
|
|
||||||
|
|
||||||
class FFmpegState(Enum):
|
|
||||||
STOPPED = "stopped"
|
|
||||||
STARTING = "starting"
|
|
||||||
RUNNING = "running"
|
|
||||||
RESTARTING = "restarting"
|
|
||||||
FAILED = "failed"
|
|
||||||
|
|
||||||
class FFmpegManager:
|
|
||||||
def __init__(self, sample_rate: int = 16000, channels: int = 1):
|
|
||||||
self.sample_rate = sample_rate
|
|
||||||
self.channels = channels
|
|
||||||
|
|
||||||
self.process: Optional[asyncio.subprocess.Process] = None
|
|
||||||
self._stderr_task: Optional[asyncio.Task] = None
|
|
||||||
|
|
||||||
self.on_error_callback: Optional[Callable[[str], None]] = None
|
|
||||||
|
|
||||||
self.state = FFmpegState.STOPPED
|
|
||||||
self._state_lock = asyncio.Lock()
|
|
||||||
|
|
||||||
async def start(self) -> bool:
|
|
||||||
async with self._state_lock:
|
|
||||||
if self.state != FFmpegState.STOPPED:
|
|
||||||
logger.warning(f"FFmpeg already running in state: {self.state}")
|
|
||||||
return False
|
|
||||||
self.state = FFmpegState.STARTING
|
|
||||||
|
|
||||||
try:
|
|
||||||
cmd = [
|
|
||||||
"ffmpeg",
|
|
||||||
"-hide_banner",
|
|
||||||
"-loglevel", "error",
|
|
||||||
"-i", "pipe:0",
|
|
||||||
"-f", "s16le",
|
|
||||||
"-acodec", "pcm_s16le",
|
|
||||||
"-ac", str(self.channels),
|
|
||||||
"-ar", str(self.sample_rate),
|
|
||||||
"pipe:1"
|
|
||||||
]
|
|
||||||
|
|
||||||
self.process = await asyncio.create_subprocess_exec(
|
|
||||||
*cmd,
|
|
||||||
stdin=asyncio.subprocess.PIPE,
|
|
||||||
stdout=asyncio.subprocess.PIPE,
|
|
||||||
stderr=asyncio.subprocess.PIPE
|
|
||||||
)
|
|
||||||
|
|
||||||
self._stderr_task = asyncio.create_task(self._drain_stderr())
|
|
||||||
|
|
||||||
async with self._state_lock:
|
|
||||||
self.state = FFmpegState.RUNNING
|
|
||||||
|
|
||||||
logger.info("FFmpeg started.")
|
|
||||||
return True
|
|
||||||
|
|
||||||
except FileNotFoundError:
|
|
||||||
logger.error(ERROR_INSTALL_INSTRUCTIONS)
|
|
||||||
async with self._state_lock:
|
|
||||||
self.state = FFmpegState.FAILED
|
|
||||||
if self.on_error_callback:
|
|
||||||
await self.on_error_callback("ffmpeg_not_found")
|
|
||||||
return False
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error starting FFmpeg: {e}")
|
|
||||||
async with self._state_lock:
|
|
||||||
self.state = FFmpegState.FAILED
|
|
||||||
if self.on_error_callback:
|
|
||||||
await self.on_error_callback("start_failed")
|
|
||||||
return False
|
|
||||||
|
|
||||||
async def stop(self):
|
|
||||||
async with self._state_lock:
|
|
||||||
if self.state == FFmpegState.STOPPED:
|
|
||||||
return
|
|
||||||
self.state = FFmpegState.STOPPED
|
|
||||||
|
|
||||||
if self.process:
|
|
||||||
if self.process.stdin and not self.process.stdin.is_closing():
|
|
||||||
self.process.stdin.close()
|
|
||||||
await self.process.stdin.wait_closed()
|
|
||||||
await self.process.wait()
|
|
||||||
self.process = None
|
|
||||||
|
|
||||||
if self._stderr_task:
|
|
||||||
self._stderr_task.cancel()
|
|
||||||
with contextlib.suppress(asyncio.CancelledError):
|
|
||||||
await self._stderr_task
|
|
||||||
|
|
||||||
logger.info("FFmpeg stopped.")
|
|
||||||
|
|
||||||
async def write_data(self, data: bytes) -> bool:
|
|
||||||
async with self._state_lock:
|
|
||||||
if self.state != FFmpegState.RUNNING:
|
|
||||||
logger.warning(f"Cannot write, FFmpeg state: {self.state}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
try:
|
|
||||||
self.process.stdin.write(data)
|
|
||||||
await self.process.stdin.drain()
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error writing to FFmpeg: {e}")
|
|
||||||
if self.on_error_callback:
|
|
||||||
await self.on_error_callback("write_error")
|
|
||||||
return False
|
|
||||||
|
|
||||||
async def read_data(self, size: int) -> Optional[bytes]:
|
|
||||||
async with self._state_lock:
|
|
||||||
if self.state != FFmpegState.RUNNING:
|
|
||||||
logger.warning(f"Cannot read, FFmpeg state: {self.state}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
try:
|
|
||||||
data = await asyncio.wait_for(
|
|
||||||
self.process.stdout.read(size),
|
|
||||||
timeout=20.0
|
|
||||||
)
|
|
||||||
return data
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
logger.warning("FFmpeg read timeout.")
|
|
||||||
return None
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error reading from FFmpeg: {e}")
|
|
||||||
if self.on_error_callback:
|
|
||||||
await self.on_error_callback("read_error")
|
|
||||||
return None
|
|
||||||
|
|
||||||
async def get_state(self) -> FFmpegState:
|
|
||||||
async with self._state_lock:
|
|
||||||
return self.state
|
|
||||||
|
|
||||||
async def restart(self) -> bool:
|
|
||||||
async with self._state_lock:
|
|
||||||
if self.state == FFmpegState.RESTARTING:
|
|
||||||
logger.warning("Restart already in progress.")
|
|
||||||
return False
|
|
||||||
self.state = FFmpegState.RESTARTING
|
|
||||||
|
|
||||||
logger.info("Restarting FFmpeg...")
|
|
||||||
|
|
||||||
try:
|
|
||||||
await self.stop()
|
|
||||||
await asyncio.sleep(1) # short delay before restarting
|
|
||||||
return await self.start()
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error during FFmpeg restart: {e}")
|
|
||||||
async with self._state_lock:
|
|
||||||
self.state = FFmpegState.FAILED
|
|
||||||
if self.on_error_callback:
|
|
||||||
await self.on_error_callback("restart_failed")
|
|
||||||
return False
|
|
||||||
|
|
||||||
async def _drain_stderr(self):
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
line = await self.process.stderr.readline()
|
|
||||||
if not line:
|
|
||||||
break
|
|
||||||
logger.debug(f"FFmpeg stderr: {line.decode(errors='ignore').strip()}")
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
logger.info("FFmpeg stderr drain task cancelled.")
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error draining FFmpeg stderr: {e}")
|
|
||||||
@@ -20,7 +20,7 @@ def parse_args():
|
|||||||
help="""
|
help="""
|
||||||
The path to a speech audio wav file to warm up Whisper so that the very first chunk processing is fast.
|
The path to a speech audio wav file to warm up Whisper so that the very first chunk processing is fast.
|
||||||
If not set, uses https://github.com/ggerganov/whisper.cpp/raw/master/samples/jfk.wav.
|
If not set, uses https://github.com/ggerganov/whisper.cpp/raw/master/samples/jfk.wav.
|
||||||
If False, no warmup is performed.
|
If empty, no warmup is performed.
|
||||||
""",
|
""",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -46,15 +46,14 @@ def append_token_to_last_line(lines, sep, token, debug_info):
|
|||||||
lines[-1].text += sep + token.text + debug_info
|
lines[-1].text += sep + token.text + debug_info
|
||||||
lines[-1].end = token.end
|
lines[-1].end = token.end
|
||||||
|
|
||||||
def format_output(state, silence, current_time, args, debug):
|
def format_output(state, silence, current_time, args, debug, sep):
|
||||||
diarization = args.diarization
|
diarization = args.diarization
|
||||||
disable_punctuation_split = args.disable_punctuation_split
|
disable_punctuation_split = args.disable_punctuation_split
|
||||||
tokens = state["tokens"]
|
tokens = state.tokens
|
||||||
translated_segments = state["translated_segments"] # Here we will attribute the speakers only based on the timestamps of the segments
|
translated_segments = state.translated_segments # Here we will attribute the speakers only based on the timestamps of the segments
|
||||||
buffer_transcription = state["buffer_transcription"]
|
buffer_transcription = state.buffer_transcription
|
||||||
buffer_diarization = state["buffer_diarization"]
|
buffer_diarization = state.buffer_diarization
|
||||||
end_attributed_speaker = state["end_attributed_speaker"]
|
end_attributed_speaker = state.end_attributed_speaker
|
||||||
sep = state["sep"]
|
|
||||||
|
|
||||||
previous_speaker = -1
|
previous_speaker = -1
|
||||||
lines = []
|
lines = []
|
||||||
@@ -128,7 +127,7 @@ def format_output(state, silence, current_time, args, debug):
|
|||||||
for line in lines:
|
for line in lines:
|
||||||
while cts_idx < len(translated_segments):
|
while cts_idx < len(translated_segments):
|
||||||
ts = translated_segments[cts_idx]
|
ts = translated_segments[cts_idx]
|
||||||
if ts.start and ts.start >= line.start and ts.end <= line.end:
|
if ts and ts.start and ts.start >= line.start and ts.end <= line.end:
|
||||||
line.translation += ts.text + ' '
|
line.translation += ts.text + ' '
|
||||||
cts_idx += 1
|
cts_idx += 1
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
from dataclasses import dataclass
|
from dataclasses import dataclass, field
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
|
|
||||||
@@ -58,3 +58,37 @@ class Line(TimedText):
|
|||||||
'start': format_time(self.start),
|
'start': format_time(self.start),
|
||||||
'end': format_time(self.end),
|
'end': format_time(self.end),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class FrontData():
|
||||||
|
status: str = ''
|
||||||
|
error: str = ''
|
||||||
|
lines: list[Line] = field(default_factory=list)
|
||||||
|
buffer_transcription: str = ''
|
||||||
|
buffer_diarization: str = ''
|
||||||
|
remaining_time_transcription: float = 0.
|
||||||
|
remaining_time_diarization: float = 0.
|
||||||
|
|
||||||
|
def to_dict(self):
|
||||||
|
_dict = {
|
||||||
|
'status': self.status,
|
||||||
|
'lines': [line.to_dict() for line in self.lines],
|
||||||
|
'buffer_transcription': self.buffer_transcription,
|
||||||
|
'buffer_diarization': self.buffer_diarization,
|
||||||
|
'remaining_time_transcription': self.remaining_time_transcription,
|
||||||
|
'remaining_time_diarization': self.remaining_time_diarization,
|
||||||
|
}
|
||||||
|
if self.error:
|
||||||
|
_dict['error'] = self.error
|
||||||
|
return _dict
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class State():
|
||||||
|
tokens: list
|
||||||
|
translated_segments: list
|
||||||
|
buffer_transcription: str
|
||||||
|
buffer_diarization: str
|
||||||
|
end_buffer: float
|
||||||
|
end_attributed_speaker: float
|
||||||
|
remaining_time_transcription: float
|
||||||
|
remaining_time_diarization: float
|
||||||
@@ -6,57 +6,46 @@ logger = logging.getLogger(__name__)
|
|||||||
def load_file(warmup_file=None, timeout=5):
|
def load_file(warmup_file=None, timeout=5):
|
||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
|
import urllib.request
|
||||||
import librosa
|
import librosa
|
||||||
|
|
||||||
if warmup_file is None:
|
if warmup_file == "":
|
||||||
|
logger.info(f"Skipping warmup.")
|
||||||
|
return None
|
||||||
|
|
||||||
# Download JFK sample if not already present
|
# Download JFK sample if not already present
|
||||||
|
if warmup_file is None:
|
||||||
jfk_url = "https://github.com/ggerganov/whisper.cpp/raw/master/samples/jfk.wav"
|
jfk_url = "https://github.com/ggerganov/whisper.cpp/raw/master/samples/jfk.wav"
|
||||||
temp_dir = tempfile.gettempdir()
|
temp_dir = tempfile.gettempdir()
|
||||||
warmup_file = os.path.join(temp_dir, "whisper_warmup_jfk.wav")
|
warmup_file = os.path.join(temp_dir, "whisper_warmup_jfk.wav")
|
||||||
|
if not os.path.exists(warmup_file) or os.path.getsize(warmup_file) == 0:
|
||||||
if not os.path.exists(warmup_file):
|
try:
|
||||||
logger.debug(f"Downloading warmup file from {jfk_url}")
|
logger.debug(f"Downloading warmup file from {jfk_url}")
|
||||||
print(f"Downloading warmup file from {jfk_url}")
|
with urllib.request.urlopen(jfk_url, timeout=timeout) as r, open(warmup_file, "wb") as f:
|
||||||
import time
|
f.write(r.read())
|
||||||
import urllib.request
|
|
||||||
import urllib.error
|
|
||||||
import socket
|
|
||||||
|
|
||||||
original_timeout = socket.getdefaulttimeout()
|
|
||||||
socket.setdefaulttimeout(timeout)
|
|
||||||
|
|
||||||
start_time = time.time()
|
|
||||||
try:
|
|
||||||
urllib.request.urlretrieve(jfk_url, warmup_file)
|
|
||||||
logger.debug(f"Download successful in {time.time() - start_time:.2f}s")
|
|
||||||
except (urllib.error.URLError, socket.timeout) as e:
|
|
||||||
logger.warning(f"Download failed: {e}. Proceeding without warmup.")
|
|
||||||
return None
|
|
||||||
finally:
|
|
||||||
socket.setdefaulttimeout(original_timeout)
|
|
||||||
elif not warmup_file:
|
|
||||||
return None
|
|
||||||
|
|
||||||
if not warmup_file or not os.path.exists(warmup_file) or os.path.getsize(warmup_file) == 0:
|
|
||||||
logger.warning(f"Warmup file {warmup_file} invalid or missing.")
|
|
||||||
return None
|
|
||||||
|
|
||||||
try:
|
|
||||||
audio, sr = librosa.load(warmup_file, sr=16000)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to load audio file: {e}")
|
logger.warning(f"Warmup file download failed: {e}.")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
# Validate file and load
|
||||||
|
if not os.path.exists(warmup_file) or os.path.getsize(warmup_file) == 0:
|
||||||
|
logger.warning(f"Warmup file {warmup_file} is invalid or missing.")
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
audio, _ = librosa.load(warmup_file, sr=16000)
|
||||||
return audio
|
return audio
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Failed to load warmup file: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
def warmup_asr(asr, warmup_file=None, timeout=5):
|
def warmup_asr(asr, warmup_file=None, timeout=5):
|
||||||
"""
|
"""
|
||||||
Warmup the ASR model by transcribing a short audio file.
|
Warmup the ASR model by transcribing a short audio file.
|
||||||
"""
|
"""
|
||||||
audio = load_file(warmup_file=None, timeout=5)
|
audio = load_file(warmup_file=warmup_file, timeout=timeout)
|
||||||
|
if audio is None:
|
||||||
|
logger.warning("Warmup file unavailable. Skipping ASR warmup.")
|
||||||
|
return
|
||||||
asr.transcribe(audio)
|
asr.transcribe(audio)
|
||||||
logger.info("ASR model is warmed up")
|
logger.info("ASR model is warmed up.")
|
||||||
|
|
||||||
def warmup_online(online, warmup_file=None, timeout=5):
|
|
||||||
audio = load_file(warmup_file=None, timeout=5)
|
|
||||||
online.warmup(audio)
|
|
||||||
logger.warning("ASR is warmed up")
|
|
||||||
@@ -12,6 +12,8 @@ let timerInterval = null;
|
|||||||
let audioContext = null;
|
let audioContext = null;
|
||||||
let analyser = null;
|
let analyser = null;
|
||||||
let microphone = null;
|
let microphone = null;
|
||||||
|
let workletNode = null;
|
||||||
|
let recorderWorker = null;
|
||||||
let waveCanvas = document.getElementById("waveCanvas");
|
let waveCanvas = document.getElementById("waveCanvas");
|
||||||
let waveCtx = waveCanvas.getContext("2d");
|
let waveCtx = waveCanvas.getContext("2d");
|
||||||
let animationFrame = null;
|
let animationFrame = null;
|
||||||
@@ -457,13 +459,38 @@ async function startRecording() {
|
|||||||
microphone = audioContext.createMediaStreamSource(stream);
|
microphone = audioContext.createMediaStreamSource(stream);
|
||||||
microphone.connect(analyser);
|
microphone.connect(analyser);
|
||||||
|
|
||||||
recorder = new MediaRecorder(stream, { mimeType: "audio/webm" });
|
if (!audioContext.audioWorklet) {
|
||||||
recorder.ondataavailable = (e) => {
|
throw new Error("AudioWorklet is not supported in this browser");
|
||||||
|
}
|
||||||
|
await audioContext.audioWorklet.addModule("/web/pcm_worklet.js");
|
||||||
|
workletNode = new AudioWorkletNode(audioContext, "pcm-forwarder", { numberOfInputs: 1, numberOfOutputs: 0, channelCount: 1 });
|
||||||
|
microphone.connect(workletNode);
|
||||||
|
|
||||||
|
recorderWorker = new Worker("/web/recorder_worker.js");
|
||||||
|
recorderWorker.postMessage({
|
||||||
|
command: "init",
|
||||||
|
config: {
|
||||||
|
sampleRate: audioContext.sampleRate,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
recorderWorker.onmessage = (e) => {
|
||||||
if (websocket && websocket.readyState === WebSocket.OPEN) {
|
if (websocket && websocket.readyState === WebSocket.OPEN) {
|
||||||
websocket.send(e.data);
|
websocket.send(e.data.buffer);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
recorder.start(chunkDuration);
|
|
||||||
|
workletNode.port.onmessage = (e) => {
|
||||||
|
const data = e.data;
|
||||||
|
const ab = data instanceof ArrayBuffer ? data : data.buffer;
|
||||||
|
recorderWorker.postMessage(
|
||||||
|
{
|
||||||
|
command: "record",
|
||||||
|
buffer: ab,
|
||||||
|
},
|
||||||
|
[ab]
|
||||||
|
);
|
||||||
|
};
|
||||||
|
|
||||||
startTime = Date.now();
|
startTime = Date.now();
|
||||||
timerInterval = setInterval(updateTimer, 1000);
|
timerInterval = setInterval(updateTimer, 1000);
|
||||||
@@ -501,9 +528,19 @@ async function stopRecording() {
|
|||||||
statusText.textContent = "Recording stopped. Processing final audio...";
|
statusText.textContent = "Recording stopped. Processing final audio...";
|
||||||
}
|
}
|
||||||
|
|
||||||
if (recorder) {
|
if (recorderWorker) {
|
||||||
recorder.stop();
|
recorderWorker.terminate();
|
||||||
recorder = null;
|
recorderWorker = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (workletNode) {
|
||||||
|
try {
|
||||||
|
workletNode.port.onmessage = null;
|
||||||
|
} catch (e) {}
|
||||||
|
try {
|
||||||
|
workletNode.disconnect();
|
||||||
|
} catch (e) {}
|
||||||
|
workletNode = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (microphone) {
|
if (microphone) {
|
||||||
|
|||||||
16
whisperlivekit/web/pcm_worklet.js
Normal file
16
whisperlivekit/web/pcm_worklet.js
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
class PCMForwarder extends AudioWorkletProcessor {
|
||||||
|
process(inputs) {
|
||||||
|
const input = inputs[0];
|
||||||
|
if (input && input[0] && input[0].length) {
|
||||||
|
// Forward mono channel (0). If multi-channel, downmixing can be added here.
|
||||||
|
const channelData = input[0];
|
||||||
|
const copy = new Float32Array(channelData.length);
|
||||||
|
copy.set(channelData);
|
||||||
|
this.port.postMessage(copy, [copy.buffer]);
|
||||||
|
}
|
||||||
|
// Keep processor alive
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
registerProcessor('pcm-forwarder', PCMForwarder);
|
||||||
58
whisperlivekit/web/recorder_worker.js
Normal file
58
whisperlivekit/web/recorder_worker.js
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
let sampleRate = 48000;
|
||||||
|
let targetSampleRate = 16000;
|
||||||
|
|
||||||
|
self.onmessage = function (e) {
|
||||||
|
switch (e.data.command) {
|
||||||
|
case 'init':
|
||||||
|
init(e.data.config);
|
||||||
|
break;
|
||||||
|
case 'record':
|
||||||
|
record(e.data.buffer);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
function init(config) {
|
||||||
|
sampleRate = config.sampleRate;
|
||||||
|
targetSampleRate = config.targetSampleRate || 16000;
|
||||||
|
}
|
||||||
|
|
||||||
|
function record(inputBuffer) {
|
||||||
|
const buffer = new Float32Array(inputBuffer);
|
||||||
|
const resampledBuffer = resample(buffer, sampleRate, targetSampleRate);
|
||||||
|
const pcmBuffer = toPCM(resampledBuffer);
|
||||||
|
self.postMessage({ buffer: pcmBuffer }, [pcmBuffer]);
|
||||||
|
}
|
||||||
|
|
||||||
|
function resample(buffer, from, to) {
|
||||||
|
if (from === to) {
|
||||||
|
return buffer;
|
||||||
|
}
|
||||||
|
const ratio = from / to;
|
||||||
|
const newLength = Math.round(buffer.length / ratio);
|
||||||
|
const result = new Float32Array(newLength);
|
||||||
|
let offsetResult = 0;
|
||||||
|
let offsetBuffer = 0;
|
||||||
|
while (offsetResult < result.length) {
|
||||||
|
const nextOffsetBuffer = Math.round((offsetResult + 1) * ratio);
|
||||||
|
let accum = 0, count = 0;
|
||||||
|
for (let i = offsetBuffer; i < nextOffsetBuffer && i < buffer.length; i++) {
|
||||||
|
accum += buffer[i];
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
result[offsetResult] = accum / count;
|
||||||
|
offsetResult++;
|
||||||
|
offsetBuffer = nextOffsetBuffer;
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
function toPCM(input) {
|
||||||
|
const buffer = new ArrayBuffer(input.length * 2);
|
||||||
|
const view = new DataView(buffer);
|
||||||
|
for (let i = 0; i < input.length; i++) {
|
||||||
|
const s = Math.max(-1, Math.min(1, input[i]));
|
||||||
|
view.setInt16(i * 2, s < 0 ? s * 0x8000 : s * 0x7FFF, true);
|
||||||
|
}
|
||||||
|
return buffer;
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user