From dc24366580dec71c0452e0f2d6205430c8aac9fb Mon Sep 17 00:00:00 2001 From: Quentin Fuxa Date: Wed, 19 Feb 2025 11:25:59 +0100 Subject: [PATCH] Number of speakers not anymore limited to 10; a speaker has been created for "being processed" (-1), and another one for no" speaker detected" (-2) --- src/diarization/diarization_online.py | 76 +++++++++++++++++---------- 1 file changed, 48 insertions(+), 28 deletions(-) diff --git a/src/diarization/diarization_online.py b/src/diarization/diarization_online.py index 432e104..8240408 100644 --- a/src/diarization/diarization_online.py +++ b/src/diarization/diarization_online.py @@ -5,6 +5,11 @@ from rx.subject import Subject import threading import numpy as np import asyncio +import re + +def extract_number(s): + match = re.search(r'\d+', s) + return int(match.group()) if match else None class WebSocketAudioSource(AudioSource): """ @@ -44,37 +49,48 @@ def create_pipeline(SAMPLE_RATE): return inference, ws_source -def init_diart(SAMPLE_RATE): - inference, ws_source = create_pipeline(SAMPLE_RATE) +def init_diart(SAMPLE_RATE, diar_instance): + diar_pipeline = SpeakerDiarization() + ws_source = WebSocketAudioSource(uri="websocket_source", sample_rate=SAMPLE_RATE) + inference = StreamingInference( + pipeline=diar_pipeline, + source=ws_source, + do_plot=False, + show_progress=False, + ) + + l_speakers_queue = asyncio.Queue() def diar_hook(result): """ Hook called each time Diart processes a chunk. result is (annotation, audio). - We store the label of the last segment in 'current_speaker'. + For each detected speaker segment, push its info to the queue and update processed_time. """ - global l_speakers - l_speakers = [] annotation, audio = result - for speaker in annotation._labels: - segments_beg = annotation._labels[speaker].segments_boundaries_[0] - segments_end = annotation._labels[speaker].segments_boundaries_[-1] - asyncio.create_task( - l_speakers_queue.put({"speaker": speaker, "beg": segments_beg, "end": segments_end}) - ) + if annotation._labels: + for speaker in annotation._labels: + segments_beg = annotation._labels[speaker].segments_boundaries_[0] + segments_end = annotation._labels[speaker].segments_boundaries_[-1] + if segments_end > diar_instance.processed_time: + diar_instance.processed_time = segments_end + asyncio.create_task( + l_speakers_queue.put({"speaker": speaker, "beg": segments_beg, "end": segments_end}) + ) + else: + audio_duration = audio.extent.end + if audio_duration > diar_instance.processed_time: + diar_instance.processed_time = audio_duration - l_speakers_queue = asyncio.Queue() inference.attach_hooks(diar_hook) - - # Launch Diart in a background thread loop = asyncio.get_event_loop() diar_future = loop.run_in_executor(None, inference) return inference, l_speakers_queue, ws_source - -class DiartDiarization(): +class DiartDiarization: def __init__(self, SAMPLE_RATE): - self.inference, self.l_speakers_queue, self.ws_source = init_diart(SAMPLE_RATE) + self.processed_time = 0 + self.inference, self.l_speakers_queue, self.ws_source = init_diart(SAMPLE_RATE, self) self.segment_speakers = [] async def diarize(self, pcm_array): @@ -82,20 +98,21 @@ class DiartDiarization(): self.segment_speakers = [] while not self.l_speakers_queue.empty(): self.segment_speakers.append(await self.l_speakers_queue.get()) - + def close(self): self.ws_source.close() - def assign_speakers_to_chunks(self, chunks): """ - Go through each chunk and see which speaker(s) overlap - that chunk's time range in the Diart annotation. - Then store the speaker label(s) (or choose the most overlapping). - This modifies `chunks` in-place or returns a new list with assigned speakers. + For each chunk (a dict with keys "beg" and "end"), assign a speaker label. + + - If a chunk overlaps with a detected speaker segment, assign that label. + - If the chunk's end time is within the processed time and no speaker was assigned, + mark it as "No speaker". + - If the chunk's time hasn't been fully processed yet, leave it (or mark as "Processing"). """ - if not self.segment_speakers: - return chunks + for ch in chunks: + ch["speaker"] = ch.get("speaker", -1) for segment in self.segment_speakers: seg_beg = segment["beg"] @@ -104,7 +121,10 @@ class DiartDiarization(): for ch in chunks: if seg_end <= ch["beg"] or seg_beg >= ch["end"]: continue - # We have overlap. Let's just pick the speaker (could be more precise in a more complex implementation) - ch["speaker"] = speaker + ch["speaker"] = extract_number(speaker) + 1 + if self.processed_time > 0: + for ch in chunks: + if ch["end"] <= self.processed_time and ch["speaker"] == -1: + ch["speaker"] = -2 - return chunks + return chunks \ No newline at end of file