From e3afaca1a61de4a821518024599fee0c9dcff228 Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Fri, 20 Mar 2026 10:28:28 +0530 Subject: [PATCH] refactor(android): route talk playback through gateway --- .../ai/openclaw/app/voice/TalkModeManager.kt | 943 ++---------------- 1 file changed, 106 insertions(+), 837 deletions(-) diff --git a/apps/android/app/src/main/java/ai/openclaw/app/voice/TalkModeManager.kt b/apps/android/app/src/main/java/ai/openclaw/app/voice/TalkModeManager.kt index 70b6113fc35..4ba2c2ef043 100644 --- a/apps/android/app/src/main/java/ai/openclaw/app/voice/TalkModeManager.kt +++ b/apps/android/app/src/main/java/ai/openclaw/app/voice/TalkModeManager.kt @@ -6,9 +6,7 @@ import android.content.Intent import android.content.pm.PackageManager import android.media.AudioAttributes import android.media.AudioFocusRequest -import android.media.AudioFormat import android.media.AudioManager -import android.media.AudioTrack import android.media.MediaPlayer import android.os.Bundle import android.os.Handler @@ -17,16 +15,12 @@ import android.os.SystemClock import android.speech.RecognitionListener import android.speech.RecognizerIntent import android.speech.SpeechRecognizer -import android.speech.tts.TextToSpeech -import android.speech.tts.UtteranceProgressListener +import android.util.Base64 import android.util.Log import androidx.core.content.ContextCompat import ai.openclaw.app.gateway.GatewaySession import ai.openclaw.app.isCanonicalMainSessionKey -import ai.openclaw.app.normalizeMainKey import java.io.File -import java.net.HttpURLConnection -import java.net.URL import java.util.UUID import java.util.concurrent.atomic.AtomicLong import kotlinx.coroutines.CancellationException @@ -46,7 +40,6 @@ import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.JsonPrimitive import kotlinx.serialization.json.buildJsonObject -import kotlin.math.max class TalkModeManager( private val context: Context, @@ -57,9 +50,6 @@ class TalkModeManager( ) { companion object { private const val tag = "TalkMode" - private const val defaultModelIdFallback = "eleven_v3" - private const val defaultOutputFormatFallback = "pcm_24000" - private const val defaultTalkProvider = "elevenlabs" private const val listenWatchdogMs = 12_000L private const val chatFinalWaitWithSubscribeMs = 45_000L private const val chatFinalWaitWithoutSubscribeMs = 6_000L @@ -84,9 +74,6 @@ class TalkModeManager( private val _lastAssistantText = MutableStateFlow(null) val lastAssistantText: StateFlow = _lastAssistantText - private val _usingFallbackTts = MutableStateFlow(false) - val usingFallbackTts: StateFlow = _usingFallbackTts - private var recognizer: SpeechRecognizer? = null private var restartJob: Job? = null private var stopRequested = false @@ -99,21 +86,11 @@ class TalkModeManager( private var lastSpokenText: String? = null private var lastInterruptedAtSeconds: Double? = null - private var defaultVoiceId: String? = null private var currentVoiceId: String? = null - private var fallbackVoiceId: String? = null - private var defaultModelId: String? = null private var currentModelId: String? = null - private var defaultOutputFormat: String? = null - private var apiKey: String? = null - private var voiceAliases: Map = emptyMap() // Interrupt-on-speech is disabled by default: starting a SpeechRecognizer during - // TTS creates an audio session conflict on OxygenOS/OnePlus that causes AudioTrack - // write to return 0 and MediaPlayer to error. Can be enabled via gateway talk config. - private var activeProviderIsElevenLabs: Boolean = true + // TTS creates an audio session conflict on some OEMs. Can be enabled via gateway talk config. private var interruptOnSpeech: Boolean = false - private var voiceOverrideActive = false - private var modelOverrideActive = false private var mainSessionKey: String = "main" @Volatile private var pendingRunId: String? = null @@ -128,14 +105,8 @@ class TalkModeManager( private var ttsJob: Job? = null private var player: MediaPlayer? = null - private var streamingSource: StreamingMediaDataSource? = null - private var pcmTrack: AudioTrack? = null - @Volatile private var pcmStopRequested = false @Volatile private var finalizeInFlight = false private var listenWatchdogJob: Job? = null - private var systemTts: TextToSpeech? = null - private var systemTtsPending: CompletableDeferred? = null - private var systemTtsPendingId: String? = null private var audioFocusRequest: AudioFocusRequest? = null private val audioFocusListener = AudioManager.OnAudioFocusChangeListener { focusChange -> @@ -208,118 +179,6 @@ class TalkModeManager( /** When true, play TTS for all final chat responses (even ones we didn't initiate). */ @Volatile var ttsOnAllResponses = false - // Streaming TTS: active session keyed by runId - private var streamingTts: ElevenLabsStreamingTts? = null - private var streamingFullText: String = "" - @Volatile private var lastHandledStreamingRunId: String? = null - private var drainingTts: ElevenLabsStreamingTts? = null - - private fun stopActiveStreamingTts() { - streamingTts?.stop() - streamingTts = null - drainingTts?.stop() - drainingTts = null - streamingFullText = "" - } - - /** Handle agent stream events — only speak assistant text, not tool calls or thinking. */ - private fun handleAgentStreamEvent(payloadJson: String?) { - if (payloadJson.isNullOrBlank()) return - val payload = try { - json.parseToJsonElement(payloadJson).asObjectOrNull() - } catch (_: Throwable) { null } ?: return - - // Only speak events for the active session — prevents TTS leaking from - // concurrent sessions/channels (privacy + correctness). - val eventSession = payload["sessionKey"]?.asStringOrNull() - val activeSession = mainSessionKey.ifBlank { "main" } - if (eventSession != null && eventSession != activeSession) return - - val stream = payload["stream"]?.asStringOrNull() ?: return - if (stream != "assistant") return // Only speak assistant text - val data = payload["data"]?.asObjectOrNull() ?: return - if (data["type"]?.asStringOrNull() == "thinking") return // Skip thinking tokens - val text = data["text"]?.asStringOrNull()?.trim() ?: return - if (text.isEmpty()) return - if (!playbackEnabled) { - stopActiveStreamingTts() - return - } - - // Start streaming session if not already active - if (streamingTts == null) { - if (!activeProviderIsElevenLabs) return // Non-ElevenLabs provider — skip streaming TTS - val voiceId = currentVoiceId ?: defaultVoiceId - val apiKey = this.apiKey - if (voiceId == null || apiKey == null) { - Log.w(tag, "streaming TTS: missing voiceId or apiKey") - return - } - val modelId = currentModelId ?: defaultModelId ?: "" - val streamModel = if (ElevenLabsStreamingTts.supportsStreaming(modelId)) { - modelId - } else { - "eleven_flash_v2_5" - } - val tts = ElevenLabsStreamingTts( - scope = scope, - voiceId = voiceId, - apiKey = apiKey, - modelId = streamModel, - outputFormat = "pcm_24000", - sampleRate = 24000, - ) - streamingTts = tts - streamingFullText = "" - _isSpeaking.value = true - _statusText.value = "Speaking…" - tts.start() - Log.d(tag, "streaming TTS started for agent assistant text") - lastHandledStreamingRunId = null // will be set on final - } - - val accepted = streamingTts?.sendText(text) ?: false - if (!accepted && streamingTts != null) { - Log.d(tag, "text diverged, restarting streaming TTS") - streamingTts?.stop() - streamingTts = null - // Restart with the new text - val voiceId2 = currentVoiceId ?: defaultVoiceId - val apiKey2 = this.apiKey - if (voiceId2 != null && apiKey2 != null) { - val modelId2 = currentModelId ?: defaultModelId ?: "" - val streamModel2 = if (ElevenLabsStreamingTts.supportsStreaming(modelId2)) modelId2 else "eleven_flash_v2_5" - val newTts = ElevenLabsStreamingTts( - scope = scope, voiceId = voiceId2, apiKey = apiKey2, - modelId = streamModel2, outputFormat = "pcm_24000", sampleRate = 24000, - ) - streamingTts = newTts - streamingFullText = text - newTts.start() - newTts.sendText(streamingFullText) - Log.d(tag, "streaming TTS restarted with new text") - } - } - } - - /** Called when chat final/error/aborted arrives — finish any active streaming TTS. */ - private fun finishStreamingTts() { - streamingFullText = "" - val tts = streamingTts ?: return - // Null out immediately so the next response creates a fresh TTS instance. - // The drain coroutine below holds a reference to this instance for cleanup. - streamingTts = null - drainingTts = tts - tts.finish() - scope.launch { - delay(500) - while (tts.isPlaying.value) { delay(200) } - if (drainingTts === tts) drainingTts = null - _isSpeaking.value = false - _statusText.value = "Ready" - } - } - fun playTtsForText(text: String) { val playbackToken = playbackGeneration.incrementAndGet() ttsJob?.cancel() @@ -338,7 +197,6 @@ class TalkModeManager( Log.d(tag, "gateway event: $event") } if (event == "agent" && ttsOnAllResponses) { - handleAgentStreamEvent(payloadJson) return } if (event != "chat") return @@ -362,27 +220,10 @@ class TalkModeManager( // Otherwise, if ttsOnAllResponses, finish streaming TTS on terminal events. val pending = pendingRunId if (pending == null || runId != pending) { - if (ttsOnAllResponses && state in listOf("final", "error", "aborted")) { - // Skip if we already handled TTS for this run (multiple final events - // can arrive on different threads for the same run). - if (lastHandledStreamingRunId == runId) { - if (pending == null || runId != pending) return - } - lastHandledStreamingRunId = runId - val stts = streamingTts - if (stts != null) { - // Finish streaming and let the drain coroutine handle playback completion. - // Don’t check hasReceivedAudio synchronously — audio may still be in flight - // from the WebSocket (EOS was just sent). The drain coroutine in finishStreamingTts - // waits for playback to complete; if ElevenLabs truly fails, the user just won’t - // hear anything (silent failure is better than double-speaking with system TTS). - finishStreamingTts() - } else if (state == "final") { - // No streaming was active — fall back to non-streaming - val text = extractTextFromChatEventMessage(obj["message"]) - if (!text.isNullOrBlank()) { - playTtsForText(text) - } + if (ttsOnAllResponses && state == "final") { + val text = extractTextFromChatEventMessage(obj["message"]) + if (!text.isNullOrBlank()) { + playTtsForText(text) } } if (pending == null || runId != pending) return @@ -419,7 +260,6 @@ class TalkModeManager( playbackEnabled = enabled if (!enabled) { playbackGeneration.incrementAndGet() - stopActiveStreamingTts() stopSpeaking() } } @@ -485,7 +325,6 @@ class TalkModeManager( _isListening.value = false _statusText.value = "Off" stopSpeaking() - _usingFallbackTts.value = false chatSubscribedSessionKey = null pendingRunId = null pendingFinal?.cancel() @@ -500,10 +339,6 @@ class TalkModeManager( recognizer?.destroy() recognizer = null } - systemTts?.stop() - systemTtsPending?.cancel() - systemTtsPending = null - systemTtsPendingId = null } private fun startListeningInternal(markListening: Boolean) { @@ -813,59 +648,19 @@ class TalkModeManager( _lastAssistantText.value = cleaned val requestedVoice = directive?.voiceId?.trim()?.takeIf { it.isNotEmpty() } - val resolvedVoice = TalkModeVoiceResolver.resolveVoiceAlias(requestedVoice, voiceAliases) - if (requestedVoice != null && resolvedVoice == null) { - Log.w(tag, "unknown voice alias: $requestedVoice") - } if (directive?.voiceId != null) { if (directive.once != true) { - currentVoiceId = resolvedVoice - voiceOverrideActive = true + currentVoiceId = requestedVoice } } if (directive?.modelId != null) { if (directive.once != true) { - currentModelId = directive.modelId - modelOverrideActive = true + currentModelId = directive.modelId?.trim()?.takeIf { it.isNotEmpty() } } } ensurePlaybackActive(playbackToken) - val apiKey = - apiKey?.trim()?.takeIf { it.isNotEmpty() } - ?: System.getenv("ELEVENLABS_API_KEY")?.trim() - val preferredVoice = resolvedVoice ?: currentVoiceId ?: defaultVoiceId - val resolvedPlaybackVoice = - if (!apiKey.isNullOrEmpty()) { - try { - TalkModeVoiceResolver.resolveVoiceId( - preferred = preferredVoice, - fallbackVoiceId = fallbackVoiceId, - defaultVoiceId = defaultVoiceId, - currentVoiceId = currentVoiceId, - voiceOverrideActive = voiceOverrideActive, - listVoices = { TalkModeVoiceResolver.listVoices(apiKey, json) }, - ) - } catch (err: Throwable) { - Log.w(tag, "list voices failed: ${err.message ?: err::class.simpleName}") - null - } - } else { - null - } - resolvedPlaybackVoice?.let { resolved -> - fallbackVoiceId = resolved.fallbackVoiceId - defaultVoiceId = resolved.defaultVoiceId - currentVoiceId = resolved.currentVoiceId - resolved.selectedVoiceName?.let { name -> - resolved.voiceId?.let { voiceId -> - Log.d(tag, "default voice selected $name ($voiceId)") - } - } - } - val voiceId = resolvedPlaybackVoice?.voiceId - _statusText.value = "Speaking…" _isSpeaking.value = true lastSpokenText = cleaned @@ -873,210 +668,99 @@ class TalkModeManager( requestAudioFocusForTts() try { - val canUseElevenLabs = !voiceId.isNullOrBlank() && !apiKey.isNullOrEmpty() - if (!canUseElevenLabs) { - if (voiceId.isNullOrBlank()) { - Log.w(tag, "missing voiceId; falling back to system voice") - } - if (apiKey.isNullOrEmpty()) { - Log.w(tag, "missing ELEVENLABS_API_KEY; falling back to system voice") - } - ensurePlaybackActive(playbackToken) - _usingFallbackTts.value = true - _statusText.value = "Speaking (System)…" - speakWithSystemTts(cleaned, playbackToken) - } else { - _usingFallbackTts.value = false - val ttsStarted = SystemClock.elapsedRealtime() - val modelId = directive?.modelId ?: currentModelId ?: defaultModelId - val request = - ElevenLabsRequest( - text = cleaned, - modelId = modelId, - outputFormat = - TalkModeRuntime.validatedOutputFormat(directive?.outputFormat ?: defaultOutputFormat), - speed = TalkModeRuntime.resolveSpeed(directive?.speed, directive?.rateWpm), - stability = TalkModeRuntime.validatedStability(directive?.stability, modelId), - similarity = TalkModeRuntime.validatedUnit(directive?.similarity), - style = TalkModeRuntime.validatedUnit(directive?.style), - speakerBoost = directive?.speakerBoost, - seed = TalkModeRuntime.validatedSeed(directive?.seed), - normalize = TalkModeRuntime.validatedNormalize(directive?.normalize), - language = TalkModeRuntime.validatedLanguage(directive?.language), - latencyTier = TalkModeRuntime.validatedLatencyTier(directive?.latencyTier), - ) - streamAndPlay(voiceId = voiceId!!, apiKey = apiKey!!, request = request, playbackToken = playbackToken) - Log.d(tag, "elevenlabs stream ok durMs=${SystemClock.elapsedRealtime() - ttsStarted}") - } + val ttsStarted = SystemClock.elapsedRealtime() + val speech = requestTalkSpeak(cleaned, directive) + playGatewaySpeech(speech, playbackToken) + Log.d(tag, "talk.speak ok durMs=${SystemClock.elapsedRealtime() - ttsStarted} provider=${speech.provider}") } catch (err: Throwable) { if (isPlaybackCancelled(err, playbackToken)) { Log.d(tag, "assistant speech cancelled") return } - Log.w(tag, "speak failed: ${err.message ?: err::class.simpleName}; falling back to system voice") - try { - ensurePlaybackActive(playbackToken) - _usingFallbackTts.value = true - _statusText.value = "Speaking (System)…" - speakWithSystemTts(cleaned, playbackToken) - } catch (fallbackErr: Throwable) { - if (isPlaybackCancelled(fallbackErr, playbackToken)) { - Log.d(tag, "assistant fallback speech cancelled") - return - } - _statusText.value = "Speak failed: ${fallbackErr.message ?: fallbackErr::class.simpleName}" - Log.w(tag, "system voice failed: ${fallbackErr.message ?: fallbackErr::class.simpleName}") - } + _statusText.value = "Speak failed: ${err.message ?: err::class.simpleName}" + Log.w(tag, "talk.speak failed: ${err.message ?: err::class.simpleName}") } finally { _isSpeaking.value = false } } - private suspend fun streamAndPlay( - voiceId: String, - apiKey: String, - request: ElevenLabsRequest, - playbackToken: Long, - ) { + private data class GatewayTalkSpeech( + val audioBase64: String, + val provider: String, + val outputFormat: String?, + val mimeType: String?, + val fileExtension: String?, + ) + + private suspend fun requestTalkSpeak(text: String, directive: TalkDirective?): GatewayTalkSpeech { + val modelId = + directive?.modelId?.trim()?.takeIf { it.isNotEmpty() } ?: currentModelId?.trim()?.takeIf { it.isNotEmpty() } + val voiceId = + directive?.voiceId?.trim()?.takeIf { it.isNotEmpty() } ?: currentVoiceId?.trim()?.takeIf { it.isNotEmpty() } + val params = + buildJsonObject { + put("text", JsonPrimitive(text)) + voiceId?.let { put("voiceId", JsonPrimitive(it)) } + modelId?.let { put("modelId", JsonPrimitive(it)) } + TalkModeRuntime.resolveSpeed(directive?.speed, directive?.rateWpm)?.let { + put("speed", JsonPrimitive(it)) + } + TalkModeRuntime.validatedStability(directive?.stability, modelId)?.let { + put("stability", JsonPrimitive(it)) + } + TalkModeRuntime.validatedUnit(directive?.similarity)?.let { + put("similarity", JsonPrimitive(it)) + } + TalkModeRuntime.validatedUnit(directive?.style)?.let { + put("style", JsonPrimitive(it)) + } + directive?.speakerBoost?.let { put("speakerBoost", JsonPrimitive(it)) } + TalkModeRuntime.validatedSeed(directive?.seed)?.let { put("seed", JsonPrimitive(it)) } + TalkModeRuntime.validatedNormalize(directive?.normalize)?.let { + put("normalize", JsonPrimitive(it)) + } + TalkModeRuntime.validatedLanguage(directive?.language)?.let { + put("language", JsonPrimitive(it)) + } + } + val res = session.request("talk.speak", params.toString()) + val root = json.parseToJsonElement(res).asObjectOrNull() ?: error("talk.speak returned invalid JSON") + val audioBase64 = root["audioBase64"].asStringOrNull()?.trim().orEmpty() + val provider = root["provider"].asStringOrNull()?.trim().orEmpty() + if (audioBase64.isEmpty()) { + error("talk.speak missing audioBase64") + } + if (provider.isEmpty()) { + error("talk.speak missing provider") + } + return GatewayTalkSpeech( + audioBase64 = audioBase64, + provider = provider, + outputFormat = root["outputFormat"].asStringOrNull()?.trim(), + mimeType = root["mimeType"].asStringOrNull()?.trim(), + fileExtension = root["fileExtension"].asStringOrNull()?.trim(), + ) + } + + private suspend fun playGatewaySpeech(speech: GatewayTalkSpeech, playbackToken: Long) { ensurePlaybackActive(playbackToken) stopSpeaking(resetInterrupt = false) ensurePlaybackActive(playbackToken) - pcmStopRequested = false - val pcmSampleRate = TalkModeRuntime.parsePcmSampleRate(request.outputFormat) - if (pcmSampleRate != null) { + val audioBytes = try { - streamAndPlayPcm( - voiceId = voiceId, - apiKey = apiKey, - request = request, - sampleRate = pcmSampleRate, - playbackToken = playbackToken, - ) - return - } catch (err: Throwable) { - if (isPlaybackCancelled(err, playbackToken) || pcmStopRequested) return - Log.w(tag, "pcm playback failed; falling back to mp3: ${err.message ?: err::class.simpleName}") + Base64.decode(speech.audioBase64, Base64.DEFAULT) + } catch (err: IllegalArgumentException) { + throw IllegalStateException("talk.speak returned invalid audio", err) } - } - - // When falling back from PCM, rewrite format to MP3 and download to file. - // File-based playback avoids custom DataSource races and is reliable across OEMs. - val mp3Request = if (request.outputFormat?.startsWith("pcm_") == true) { - request.copy(outputFormat = "mp3_44100_128") - } else { - request - } - streamAndPlayMp3(voiceId = voiceId, apiKey = apiKey, request = mp3Request, playbackToken = playbackToken) - } - - private suspend fun streamAndPlayMp3( - voiceId: String, - apiKey: String, - request: ElevenLabsRequest, - playbackToken: Long, - ) { - val dataSource = StreamingMediaDataSource() - streamingSource = dataSource - - val player = MediaPlayer() - this.player = player - - val prepared = CompletableDeferred() - val finished = CompletableDeferred() - - player.setAudioAttributes( - AudioAttributes.Builder() - .setContentType(AudioAttributes.CONTENT_TYPE_SPEECH) - .setUsage(AudioAttributes.USAGE_MEDIA) - .build(), - ) - player.setOnPreparedListener { - it.start() - prepared.complete(Unit) - } - player.setOnCompletionListener { - finished.complete(Unit) - } - player.setOnErrorListener { _, _, _ -> - finished.completeExceptionally(IllegalStateException("MediaPlayer error")) - true - } - - player.setDataSource(dataSource) - withContext(Dispatchers.Main) { - player.prepareAsync() - } - - val fetchError = CompletableDeferred() - val fetchJob = - scope.launch(Dispatchers.IO) { - try { - streamTts(voiceId = voiceId, apiKey = apiKey, request = request, sink = dataSource, playbackToken = playbackToken) - fetchError.complete(null) - } catch (err: Throwable) { - dataSource.fail() - fetchError.complete(err) + val suffix = resolveGatewayAudioSuffix(speech) + val tempFile = + withContext(Dispatchers.IO) { + File.createTempFile("tts_", suffix, context.cacheDir).apply { + writeBytes(audioBytes) } } - - Log.d(tag, "play start") - try { - ensurePlaybackActive(playbackToken) - prepared.await() - ensurePlaybackActive(playbackToken) - finished.await() - ensurePlaybackActive(playbackToken) - fetchError.await()?.let { throw it } - } finally { - fetchJob.cancel() - cleanupPlayer() - } - Log.d(tag, "play done") - } - - /** - * Download ElevenLabs audio to a temp file, then play from disk via MediaPlayer. - * Simpler and more reliable than streaming: avoids custom DataSource races and - * AudioTrack underrun issues on OxygenOS/OnePlus. - */ - private suspend fun streamAndPlayViaFile(voiceId: String, apiKey: String, request: ElevenLabsRequest) { - val tempFile = withContext(Dispatchers.IO) { - val file = File.createTempFile("tts_", ".mp3", context.cacheDir) - val conn = openTtsConnection(voiceId = voiceId, apiKey = apiKey, request = request) - try { - val payload = buildRequestPayload(request) - conn.outputStream.use { it.write(payload.toByteArray()) } - val code = conn.responseCode - if (code >= 400) { - val body = conn.errorStream?.readBytes()?.toString(Charsets.UTF_8) ?: "" - file.delete() - throw IllegalStateException("ElevenLabs failed: $code $body") - } - Log.d(tag, "elevenlabs http code=$code voiceId=$voiceId format=${request.outputFormat}") - // Manual loop so cancellation is honoured on every chunk. - // input.copyTo() is a single blocking call with no yield points; if the - // coroutine is cancelled mid-download the entire response would finish - // before cancellation was observed. - conn.inputStream.use { input -> - file.outputStream().use { out -> - val buf = ByteArray(8192) - var n: Int - while (input.read(buf).also { n = it } != -1) { - ensureActive() - out.write(buf, 0, n) - } - } - } - } catch (err: Throwable) { - file.delete() - throw err - } finally { - conn.disconnect() - } - file - } try { val player = MediaPlayer() this.player = player @@ -1094,181 +778,45 @@ class TalkModeManager( } player.setDataSource(tempFile.absolutePath) withContext(Dispatchers.IO) { player.prepare() } - Log.d(tag, "file play start bytes=${tempFile.length()}") + ensurePlaybackActive(playbackToken) player.start() finished.await() - Log.d(tag, "file play done") + ensurePlaybackActive(playbackToken) } finally { - try { cleanupPlayer() } catch (_: Throwable) {} + try { + cleanupPlayer() + } catch (_: Throwable) {} tempFile.delete() } } - private suspend fun streamAndPlayPcm( - voiceId: String, - apiKey: String, - request: ElevenLabsRequest, - sampleRate: Int, - playbackToken: Long, - ) { - ensurePlaybackActive(playbackToken) - val minBuffer = - AudioTrack.getMinBufferSize( - sampleRate, - AudioFormat.CHANNEL_OUT_MONO, - AudioFormat.ENCODING_PCM_16BIT, - ) - if (minBuffer <= 0) { - throw IllegalStateException("AudioTrack buffer size invalid: $minBuffer") + private fun resolveGatewayAudioSuffix(speech: GatewayTalkSpeech): String { + val extension = speech.fileExtension?.trim() + if (!extension.isNullOrEmpty()) { + return if (extension.startsWith(".")) extension else ".$extension" } - - val bufferSize = max(minBuffer * 2, 8 * 1024) - val track = - AudioTrack( - AudioAttributes.Builder() - .setContentType(AudioAttributes.CONTENT_TYPE_SPEECH) - .setUsage(AudioAttributes.USAGE_MEDIA) - .build(), - AudioFormat.Builder() - .setSampleRate(sampleRate) - .setChannelMask(AudioFormat.CHANNEL_OUT_MONO) - .setEncoding(AudioFormat.ENCODING_PCM_16BIT) - .build(), - bufferSize, - AudioTrack.MODE_STREAM, - AudioManager.AUDIO_SESSION_ID_GENERATE, - ) - if (track.state != AudioTrack.STATE_INITIALIZED) { - track.release() - throw IllegalStateException("AudioTrack init failed") - } - pcmTrack = track - // Don't call track.play() yet — start the track only when the first audio - // chunk arrives from ElevenLabs (see streamPcm). OxygenOS/OnePlus kills an - // AudioTrack that underruns (no data written) for ~1+ seconds, causing - // write() to return 0. Deferring play() until first data avoids the underrun. - - Log.d(tag, "pcm play start sampleRate=$sampleRate bufferSize=$bufferSize") - try { - streamPcm(voiceId = voiceId, apiKey = apiKey, request = request, track = track, playbackToken = playbackToken) - } finally { - cleanupPcmTrack() - } - Log.d(tag, "pcm play done") + val mimeType = speech.mimeType?.trim()?.lowercase() + if (mimeType == "audio/mpeg") return ".mp3" + if (mimeType == "audio/ogg") return ".ogg" + if (mimeType == "audio/wav") return ".wav" + if (mimeType == "audio/webm") return ".webm" + val outputFormat = speech.outputFormat?.trim()?.lowercase().orEmpty() + if (outputFormat == "mp3" || outputFormat.startsWith("mp3_") || outputFormat.endsWith("-mp3")) return ".mp3" + if (outputFormat == "opus" || outputFormat.startsWith("opus_")) return ".ogg" + if (outputFormat.endsWith("-wav")) return ".wav" + if (outputFormat.endsWith("-webm")) return ".webm" + return ".audio" } - private suspend fun speakWithSystemTts(text: String, playbackToken: Long) { - val trimmed = text.trim() - if (trimmed.isEmpty()) return - ensurePlaybackActive(playbackToken) - val ok = ensureSystemTts() - if (!ok) { - throw IllegalStateException("system TTS unavailable") - } - ensurePlaybackActive(playbackToken) - - val tts = systemTts ?: throw IllegalStateException("system TTS unavailable") - val utteranceId = "talk-${UUID.randomUUID()}" - val deferred = CompletableDeferred() - systemTtsPending?.cancel() - systemTtsPending = deferred - systemTtsPendingId = utteranceId - - withContext(Dispatchers.Main) { - ensurePlaybackActive(playbackToken) - val params = Bundle() - tts.speak(trimmed, TextToSpeech.QUEUE_FLUSH, params, utteranceId) - } - - withContext(Dispatchers.IO) { - try { - kotlinx.coroutines.withTimeout(180_000) { deferred.await() } - } catch (err: Throwable) { - throw err - } - ensurePlaybackActive(playbackToken) - } - } - - private suspend fun ensureSystemTts(): Boolean { - if (systemTts != null) return true - return withContext(Dispatchers.Main) { - val deferred = CompletableDeferred() - val tts = - try { - TextToSpeech(context) { status -> - deferred.complete(status == TextToSpeech.SUCCESS) - } - } catch (_: Throwable) { - deferred.complete(false) - null - } - if (tts == null) return@withContext false - - tts.setOnUtteranceProgressListener( - object : UtteranceProgressListener() { - override fun onStart(utteranceId: String?) {} - - override fun onDone(utteranceId: String?) { - if (utteranceId == null) return - if (utteranceId != systemTtsPendingId) return - systemTtsPending?.complete(Unit) - systemTtsPending = null - systemTtsPendingId = null - } - - @Suppress("OVERRIDE_DEPRECATION") - @Deprecated("Deprecated in Java") - override fun onError(utteranceId: String?) { - if (utteranceId == null) return - if (utteranceId != systemTtsPendingId) return - systemTtsPending?.completeExceptionally(IllegalStateException("system TTS error")) - systemTtsPending = null - systemTtsPendingId = null - } - - override fun onError(utteranceId: String?, errorCode: Int) { - if (utteranceId == null) return - if (utteranceId != systemTtsPendingId) return - systemTtsPending?.completeExceptionally(IllegalStateException("system TTS error $errorCode")) - systemTtsPending = null - systemTtsPendingId = null - } - }, - ) - - val ok = - try { - deferred.await() - } catch (_: Throwable) { - false - } - if (ok) { - systemTts = tts - } else { - tts.shutdown() - } - ok - } - } - - /** Stop any active TTS immediately — call when user taps mic to barge in. */ fun stopTts() { - stopActiveStreamingTts() stopSpeaking(resetInterrupt = true) _isSpeaking.value = false _statusText.value = "Listening" } private fun stopSpeaking(resetInterrupt: Boolean = true) { - pcmStopRequested = true if (!_isSpeaking.value) { cleanupPlayer() - cleanupPcmTrack() - systemTts?.stop() - systemTtsPending?.cancel() - systemTtsPending = null - systemTtsPendingId = null abandonAudioFocus() return } @@ -1277,11 +825,6 @@ class TalkModeManager( lastInterruptedAtSeconds = currentMs / 1000.0 } cleanupPlayer() - cleanupPcmTrack() - systemTts?.stop() - systemTtsPending?.cancel() - systemTtsPending = null - systemTtsPendingId = null _isSpeaking.value = false abandonAudioFocus() } @@ -1325,22 +868,6 @@ class TalkModeManager( player?.stop() player?.release() player = null - streamingSource?.close() - streamingSource = null - } - - private fun cleanupPcmTrack() { - val track = pcmTrack ?: return - try { - track.pause() - track.flush() - track.stop() - } catch (_: Throwable) { - // ignore cleanup errors - } finally { - track.release() - } - pcmTrack = null } private fun shouldInterrupt(transcript: String): Boolean { @@ -1369,71 +896,18 @@ class TalkModeManager( } private suspend fun reloadConfig() { - val envVoice = System.getenv("ELEVENLABS_VOICE_ID")?.trim() - val sagVoice = System.getenv("SAG_VOICE_ID")?.trim() - val envKey = System.getenv("ELEVENLABS_API_KEY")?.trim() try { - val res = session.request("talk.config", """{"includeSecrets":true}""") + val res = session.request("talk.config", "{}") val root = json.parseToJsonElement(res).asObjectOrNull() - val parsed = - TalkModeGatewayConfigParser.parse( - config = root?.get("config").asObjectOrNull(), - defaultProvider = defaultTalkProvider, - defaultModelIdFallback = defaultModelIdFallback, - defaultOutputFormatFallback = defaultOutputFormatFallback, - envVoice = envVoice, - sagVoice = sagVoice, - envKey = envKey, - ) - if (parsed.missingResolvedPayload) { - Log.w(tag, "talk config ignored: normalized payload missing talk.resolved") - } - + val parsed = TalkModeGatewayConfigParser.parse(root?.get("config").asObjectOrNull()) if (!isCanonicalMainSessionKey(mainSessionKey)) { mainSessionKey = parsed.mainSessionKey } - defaultVoiceId = parsed.defaultVoiceId - voiceAliases = parsed.voiceAliases - if (!voiceOverrideActive) currentVoiceId = defaultVoiceId - defaultModelId = parsed.defaultModelId - if (!modelOverrideActive) currentModelId = defaultModelId - defaultOutputFormat = parsed.defaultOutputFormat - apiKey = parsed.apiKey silenceWindowMs = parsed.silenceTimeoutMs - Log.d( - tag, - "reloadConfig apiKey=${if (apiKey != null) "set" else "null"} voiceId=$defaultVoiceId silenceTimeoutMs=${parsed.silenceTimeoutMs}", - ) - if (parsed.interruptOnSpeech != null) interruptOnSpeech = parsed.interruptOnSpeech - activeProviderIsElevenLabs = parsed.activeProvider == defaultTalkProvider - if (!activeProviderIsElevenLabs) { - // Clear ElevenLabs credentials so playAssistant won't attempt ElevenLabs calls - apiKey = null - defaultVoiceId = null - if (!voiceOverrideActive) currentVoiceId = null - Log.w(tag, "talk provider ${parsed.activeProvider} unsupported; using system voice fallback") - } else if (parsed.normalizedPayload) { - Log.d(tag, "talk config provider=elevenlabs") - } + parsed.interruptOnSpeech?.let { interruptOnSpeech = it } configLoaded = true } catch (_: Throwable) { - val fallback = - TalkModeGatewayConfigParser.fallback( - defaultProvider = defaultTalkProvider, - defaultModelIdFallback = defaultModelIdFallback, - defaultOutputFormatFallback = defaultOutputFormatFallback, - envVoice = envVoice, - sagVoice = sagVoice, - envKey = envKey, - ) - silenceWindowMs = fallback.silenceTimeoutMs - defaultVoiceId = fallback.defaultVoiceId - defaultModelId = fallback.defaultModelId - if (!modelOverrideActive) currentModelId = defaultModelId - apiKey = fallback.apiKey - voiceAliases = fallback.voiceAliases - defaultOutputFormat = fallback.defaultOutputFormat - // Keep config load retryable after transient fetch failures. + silenceWindowMs = TalkDefaults.defaultSilenceTimeoutMs configLoaded = false } } @@ -1443,189 +917,6 @@ class TalkModeManager( return obj["runId"].asStringOrNull() } - private suspend fun streamTts( - voiceId: String, - apiKey: String, - request: ElevenLabsRequest, - sink: StreamingMediaDataSource, - playbackToken: Long, - ) { - withContext(Dispatchers.IO) { - ensurePlaybackActive(playbackToken) - val conn = openTtsConnection(voiceId = voiceId, apiKey = apiKey, request = request) - try { - val payload = buildRequestPayload(request) - conn.outputStream.use { it.write(payload.toByteArray()) } - - val code = conn.responseCode - Log.d(tag, "elevenlabs http code=$code voiceId=$voiceId format=${request.outputFormat} keyLen=${apiKey.length}") - if (code >= 400) { - val message = conn.errorStream?.readBytes()?.toString(Charsets.UTF_8) ?: "" - Log.w(tag, "elevenlabs error code=$code voiceId=$voiceId body=$message") - sink.fail() - throw IllegalStateException("ElevenLabs failed: $code $message") - } - - val buffer = ByteArray(8 * 1024) - conn.inputStream.use { input -> - while (true) { - ensurePlaybackActive(playbackToken) - val read = input.read(buffer) - if (read <= 0) break - ensurePlaybackActive(playbackToken) - sink.append(buffer.copyOf(read)) - } - } - sink.finish() - } finally { - conn.disconnect() - } - } - } - - private suspend fun streamPcm( - voiceId: String, - apiKey: String, - request: ElevenLabsRequest, - track: AudioTrack, - playbackToken: Long, - ) { - withContext(Dispatchers.IO) { - ensurePlaybackActive(playbackToken) - val conn = openTtsConnection(voiceId = voiceId, apiKey = apiKey, request = request) - try { - val payload = buildRequestPayload(request) - conn.outputStream.use { it.write(payload.toByteArray()) } - - val code = conn.responseCode - if (code >= 400) { - val message = conn.errorStream?.readBytes()?.toString(Charsets.UTF_8) ?: "" - throw IllegalStateException("ElevenLabs failed: $code $message") - } - - var totalBytesWritten = 0L - var trackStarted = false - val buffer = ByteArray(8 * 1024) - conn.inputStream.use { input -> - while (true) { - if (pcmStopRequested || isPlaybackCancelled(null, playbackToken)) return@withContext - val read = input.read(buffer) - if (read <= 0) break - // Start the AudioTrack only when the first chunk is ready — avoids - // the ~1.4s underrun window while ElevenLabs prepares audio. - // OxygenOS kills a track that underruns for >1s (write() returns 0). - if (!trackStarted) { - track.play() - trackStarted = true - } - var offset = 0 - while (offset < read) { - if (pcmStopRequested || isPlaybackCancelled(null, playbackToken)) return@withContext - val wrote = - try { - track.write(buffer, offset, read - offset) - } catch (err: Throwable) { - if (pcmStopRequested || isPlaybackCancelled(err, playbackToken)) return@withContext - throw err - } - if (wrote <= 0) { - if (pcmStopRequested || isPlaybackCancelled(null, playbackToken)) return@withContext - throw IllegalStateException("AudioTrack write failed: $wrote") - } - offset += wrote - } - } - } - } finally { - conn.disconnect() - } - } - } - - private suspend fun waitForPcmDrain(track: AudioTrack, totalFrames: Long, sampleRate: Int) { - if (totalFrames <= 0) return - withContext(Dispatchers.IO) { - val drainDeadline = SystemClock.elapsedRealtime() + 15_000 - while (!pcmStopRequested && SystemClock.elapsedRealtime() < drainDeadline) { - val played = track.playbackHeadPosition.toLong().and(0xFFFFFFFFL) - if (played >= totalFrames) break - val remainingFrames = totalFrames - played - val sleepMs = ((remainingFrames * 1000L) / sampleRate.toLong()).coerceIn(12L, 120L) - delay(sleepMs) - } - } - } - - private fun openTtsConnection( - voiceId: String, - apiKey: String, - request: ElevenLabsRequest, - ): HttpURLConnection { - val baseUrl = "https://api.elevenlabs.io/v1/text-to-speech/$voiceId/stream" - val latencyTier = request.latencyTier - val url = - if (latencyTier != null) { - URL("$baseUrl?optimize_streaming_latency=$latencyTier") - } else { - URL(baseUrl) - } - val conn = url.openConnection() as HttpURLConnection - conn.requestMethod = "POST" - conn.connectTimeout = 30_000 - conn.readTimeout = 30_000 - conn.setRequestProperty("Content-Type", "application/json") - conn.setRequestProperty("Accept", resolveAcceptHeader(request.outputFormat)) - conn.setRequestProperty("xi-api-key", apiKey) - conn.doOutput = true - return conn - } - - private fun resolveAcceptHeader(outputFormat: String?): String { - val normalized = outputFormat?.trim()?.lowercase().orEmpty() - return if (normalized.startsWith("pcm_")) "audio/pcm" else "audio/mpeg" - } - - private fun buildRequestPayload(request: ElevenLabsRequest): String { - val voiceSettingsEntries = - buildJsonObject { - request.speed?.let { put("speed", JsonPrimitive(it)) } - request.stability?.let { put("stability", JsonPrimitive(it)) } - request.similarity?.let { put("similarity_boost", JsonPrimitive(it)) } - request.style?.let { put("style", JsonPrimitive(it)) } - request.speakerBoost?.let { put("use_speaker_boost", JsonPrimitive(it)) } - } - - val payload = - buildJsonObject { - put("text", JsonPrimitive(request.text)) - request.modelId?.takeIf { it.isNotEmpty() }?.let { put("model_id", JsonPrimitive(it)) } - request.outputFormat?.takeIf { it.isNotEmpty() }?.let { put("output_format", JsonPrimitive(it)) } - request.seed?.let { put("seed", JsonPrimitive(it)) } - request.normalize?.let { put("apply_text_normalization", JsonPrimitive(it)) } - request.language?.let { put("language_code", JsonPrimitive(it)) } - if (voiceSettingsEntries.isNotEmpty()) { - put("voice_settings", voiceSettingsEntries) - } - } - - return payload.toString() - } - - private data class ElevenLabsRequest( - val text: String, - val modelId: String?, - val outputFormat: String?, - val speed: Double?, - val stability: Double?, - val similarity: Double?, - val style: Double?, - val speakerBoost: Boolean?, - val seed: Long?, - val normalize: String?, - val language: String?, - val latencyTier: Int?, - ) - private object TalkModeRuntime { fun resolveSpeed(speed: Double?, rateWpm: Int?): Double? { if (rateWpm != null && rateWpm > 0) { @@ -1673,28 +964,6 @@ class TalkModeManager( return normalized } - fun validatedOutputFormat(value: String?): String? { - val trimmed = value?.trim()?.lowercase() ?: return null - if (trimmed.isEmpty()) return null - if (trimmed.startsWith("mp3_")) return trimmed - return if (parsePcmSampleRate(trimmed) != null) trimmed else null - } - - fun validatedLatencyTier(value: Int?): Int? { - if (value == null) return null - if (value < 0 || value > 4) return null - return value - } - - fun parsePcmSampleRate(value: String?): Int? { - val trimmed = value?.trim()?.lowercase() ?: return null - if (!trimmed.startsWith("pcm_")) return null - val suffix = trimmed.removePrefix("pcm_") - val digits = suffix.takeWhile { it.isDigit() } - val rate = digits.toIntOrNull() ?: return null - return if (rate in setOf(16000, 22050, 24000, 44100)) rate else null - } - fun isMessageTimestampAfter(timestamp: Double, sinceSeconds: Double): Boolean { val sinceMs = sinceSeconds * 1000 return if (timestamp > 10_000_000_000) {