Number of speakers not anymore limited to 10; a speaker has been created for "being processed" (-1), and another one for no" speaker detected" (-2)

This commit is contained in:
Quentin Fuxa
2025-02-19 11:25:59 +01:00
parent 6121083549
commit dc24366580

View File

@@ -5,6 +5,11 @@ from rx.subject import Subject
import threading
import numpy as np
import asyncio
import re
def extract_number(s):
match = re.search(r'\d+', s)
return int(match.group()) if match else None
class WebSocketAudioSource(AudioSource):
"""
@@ -44,37 +49,48 @@ def create_pipeline(SAMPLE_RATE):
return inference, ws_source
def init_diart(SAMPLE_RATE):
inference, ws_source = create_pipeline(SAMPLE_RATE)
def init_diart(SAMPLE_RATE, diar_instance):
diar_pipeline = SpeakerDiarization()
ws_source = WebSocketAudioSource(uri="websocket_source", sample_rate=SAMPLE_RATE)
inference = StreamingInference(
pipeline=diar_pipeline,
source=ws_source,
do_plot=False,
show_progress=False,
)
l_speakers_queue = asyncio.Queue()
def diar_hook(result):
"""
Hook called each time Diart processes a chunk.
result is (annotation, audio).
We store the label of the last segment in 'current_speaker'.
For each detected speaker segment, push its info to the queue and update processed_time.
"""
global l_speakers
l_speakers = []
annotation, audio = result
for speaker in annotation._labels:
segments_beg = annotation._labels[speaker].segments_boundaries_[0]
segments_end = annotation._labels[speaker].segments_boundaries_[-1]
asyncio.create_task(
l_speakers_queue.put({"speaker": speaker, "beg": segments_beg, "end": segments_end})
)
if annotation._labels:
for speaker in annotation._labels:
segments_beg = annotation._labels[speaker].segments_boundaries_[0]
segments_end = annotation._labels[speaker].segments_boundaries_[-1]
if segments_end > diar_instance.processed_time:
diar_instance.processed_time = segments_end
asyncio.create_task(
l_speakers_queue.put({"speaker": speaker, "beg": segments_beg, "end": segments_end})
)
else:
audio_duration = audio.extent.end
if audio_duration > diar_instance.processed_time:
diar_instance.processed_time = audio_duration
l_speakers_queue = asyncio.Queue()
inference.attach_hooks(diar_hook)
# Launch Diart in a background thread
loop = asyncio.get_event_loop()
diar_future = loop.run_in_executor(None, inference)
return inference, l_speakers_queue, ws_source
class DiartDiarization():
class DiartDiarization:
def __init__(self, SAMPLE_RATE):
self.inference, self.l_speakers_queue, self.ws_source = init_diart(SAMPLE_RATE)
self.processed_time = 0
self.inference, self.l_speakers_queue, self.ws_source = init_diart(SAMPLE_RATE, self)
self.segment_speakers = []
async def diarize(self, pcm_array):
@@ -82,20 +98,21 @@ class DiartDiarization():
self.segment_speakers = []
while not self.l_speakers_queue.empty():
self.segment_speakers.append(await self.l_speakers_queue.get())
def close(self):
self.ws_source.close()
def assign_speakers_to_chunks(self, chunks):
"""
Go through each chunk and see which speaker(s) overlap
that chunk's time range in the Diart annotation.
Then store the speaker label(s) (or choose the most overlapping).
This modifies `chunks` in-place or returns a new list with assigned speakers.
For each chunk (a dict with keys "beg" and "end"), assign a speaker label.
- If a chunk overlaps with a detected speaker segment, assign that label.
- If the chunk's end time is within the processed time and no speaker was assigned,
mark it as "No speaker".
- If the chunk's time hasn't been fully processed yet, leave it (or mark as "Processing").
"""
if not self.segment_speakers:
return chunks
for ch in chunks:
ch["speaker"] = ch.get("speaker", -1)
for segment in self.segment_speakers:
seg_beg = segment["beg"]
@@ -104,7 +121,10 @@ class DiartDiarization():
for ch in chunks:
if seg_end <= ch["beg"] or seg_beg >= ch["end"]:
continue
# We have overlap. Let's just pick the speaker (could be more precise in a more complex implementation)
ch["speaker"] = speaker
ch["speaker"] = extract_number(speaker) + 1
if self.processed_time > 0:
for ch in chunks:
if ch["end"] <= self.processed_time and ch["speaker"] == -1:
ch["speaker"] = -2
return chunks
return chunks