translator takes all the tokens from the queue

This commit is contained in:
Quentin Fuxa
2025-09-09 19:55:39 +02:00
parent da8726b2cb
commit add7ea07ee
6 changed files with 105 additions and 12 deletions

View File

@@ -16,6 +16,17 @@ logger.setLevel(logging.DEBUG)
SENTINEL = object() # unique sentinel object for end of stream marker
async def get_all_from_queue(queue):
items = []
try:
while True:
item = queue.get_nowait()
items.append(item)
except asyncio.QueueEmpty:
pass
return items
class AudioProcessor:
"""
Processes audio streams for transcription and diarization.
@@ -265,6 +276,8 @@ class AudioProcessor:
if self.args.diarization and self.diarization_queue:
await self.diarization_queue.put(SENTINEL)
logger.debug("Sentinel put into diarization_queue.")
if self.args.target_language and self.translation_queue:
await self.translation_queue.put(SENTINEL)
async def transcription_processor(self):
@@ -308,9 +321,6 @@ class AudioProcessor:
cumulative_pcm_duration_stream_time += duration_this_chunk
stream_time_end_of_current_pcm = cumulative_pcm_duration_stream_time
self.online.insert_audio_chunk(pcm_array, stream_time_end_of_current_pcm)
new_tokens, current_audio_processed_upto = self.online.process_iter()
@@ -338,6 +348,11 @@ class AudioProcessor:
await self.update_transcription(
new_tokens, buffer_text, new_end_buffer, self.sep
)
if new_tokens and self.args.target_language and self.translation_queue:
for token in new_tokens:
await self.translation_queue.put(token)
self.transcription_queue.task_done()
except Exception as e:
@@ -398,9 +413,44 @@ class AudioProcessor:
# in the future we want to have different languages for each speaker etc, so it will be more complex.
while True:
try:
item = await self.translation_queue.get()
token = await self.translation_queue.get() #block until at least 1 token
if token is SENTINEL:
logger.debug("Translation processor received sentinel. Finishing.")
self.translation_queue.task_done()
break
# get all the available tokens for translation. The more words, the more precise
tokens_to_process = [token]
additional_tokens = await get_all_from_queue(self.translation_queue)
sentinel_found = False
for additional_token in additional_tokens:
if additional_token is SENTINEL:
sentinel_found = True
break
tokens_to_process.append(additional_token)
if tokens_to_process:
online_translation.insert_tokens(tokens_to_process)
translations = online_translation.process()
print(translations)
self.translation_queue.task_done()
for _ in additional_tokens:
self.translation_queue.task_done()
if sentinel_found:
logger.debug("Translation processor received sentinel in batch. Finishing.")
break
except Exception as e:
logger.warning(f"Exception in translation_processor: {e}")
logger.warning(f"Traceback: {traceback.format_exc()}")
if 'token' in locals() and token is not SENTINEL:
self.translation_queue.task_done()
if 'additional_tokens' in locals():
for _ in additional_tokens:
self.translation_queue.task_done()
logger.info("Translation processor task finished.")
async def results_formatter(self):
"""Format processing results for output."""
@@ -546,8 +596,10 @@ class AudioProcessor:
self.all_tasks_for_cleanup.append(self.diarization_task)
processing_tasks_for_watchdog.append(self.diarization_task)
if self.args.target_language and self.args.language != 'auto':
if self.args.target_language and self.args.lan != 'auto':
self.translation_task = asyncio.create_task(self.translation_processor(self.online_translation))
self.all_tasks_for_cleanup.append(self.translation_task)
processing_tasks_for_watchdog.append(self.translation_task)
self.ffmpeg_reader_task = asyncio.create_task(self.ffmpeg_stdout_reader())
self.all_tasks_for_cleanup.append(self.ffmpeg_reader_task)