From 434dc46531a50a9ec0ca3a781fb680627a5f04dd Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Wed, 25 Feb 2026 18:08:20 +0530 Subject: [PATCH] feat(android): stream voice turns from mic manager events --- .../android/voice/MicCaptureManager.kt | 236 ++++++++++++++---- 1 file changed, 183 insertions(+), 53 deletions(-) diff --git a/apps/android/app/src/main/java/ai/openclaw/android/voice/MicCaptureManager.kt b/apps/android/app/src/main/java/ai/openclaw/android/voice/MicCaptureManager.kt index 26f250013ef..603cd3d324b 100644 --- a/apps/android/app/src/main/java/ai/openclaw/android/voice/MicCaptureManager.kt +++ b/apps/android/app/src/main/java/ai/openclaw/android/voice/MicCaptureManager.kt @@ -11,6 +11,7 @@ import android.speech.RecognitionListener import android.speech.RecognizerIntent import android.speech.SpeechRecognizer import androidx.core.content.ContextCompat +import java.util.UUID import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.delay @@ -18,9 +19,22 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.launch import kotlinx.serialization.json.Json +import kotlinx.serialization.json.JsonArray import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.JsonPrimitive +enum class VoiceConversationRole { + User, + Assistant, +} + +data class VoiceConversationEntry( + val id: String, + val role: VoiceConversationRole, + val text: String, + val isStreaming: Boolean = false, +) + class MicCaptureManager( private val context: Context, private val scope: CoroutineScope, @@ -30,8 +44,13 @@ class MicCaptureManager( private const val speechMinSessionMs = 30_000L private const val speechCompleteSilenceMs = 1_500L private const val speechPossibleSilenceMs = 900L + private const val maxConversationEntries = 40 } + private data class QueuedUtterance( + val text: String, + ) + private val mainHandler = Handler(Looper.getMainLooper()) private val json = Json { ignoreUnknownKeys = true } @@ -50,16 +69,20 @@ class MicCaptureManager( private val _queuedMessages = MutableStateFlow>(emptyList()) val queuedMessages: StateFlow> = _queuedMessages + private val _conversation = MutableStateFlow>(emptyList()) + val conversation: StateFlow> = _conversation + private val _inputLevel = MutableStateFlow(0f) val inputLevel: StateFlow = _inputLevel private val _isSending = MutableStateFlow(false) val isSending: StateFlow = _isSending - private val messageQueue = ArrayDeque() + private val messageQueue = ArrayDeque() private val sessionSegments = mutableListOf() private var lastFinalSegment: String? = null private var pendingRunId: String? = null + private var pendingAssistantEntryId: String? = null private var gatewayConnected = false private var recognizer: SpeechRecognizer? = null @@ -71,6 +94,7 @@ class MicCaptureManager( _micEnabled.value = enabled if (enabled) { start() + sendQueuedIfIdle() } else { stop() flushSessionToQueue() @@ -81,38 +105,54 @@ class MicCaptureManager( fun onGatewayConnectionChanged(connected: Boolean) { gatewayConnected = connected if (connected) { - if (!_micEnabled.value) { - sendQueuedIfIdle() - } + sendQueuedIfIdle() return } - if (!_micEnabled.value && messageQueue.isNotEmpty()) { - _statusText.value = "Queued ${messageQueue.size} message(s) · waiting for gateway" + if (messageQueue.isNotEmpty()) { + _statusText.value = queuedWaitingStatus() } } fun handleGatewayEvent(event: String, payloadJson: String?) { if (event != "chat") return - val runId = pendingRunId ?: return if (payloadJson.isNullOrBlank()) return - val obj = + val payload = try { json.parseToJsonElement(payloadJson).asObjectOrNull() } catch (_: Throwable) { null } ?: return - val eventRunId = obj["runId"].asStringOrNull() ?: return - if (eventRunId != runId) return - val state = obj["state"].asStringOrNull() ?: return - if (state != "final") return - if (messageQueue.isNotEmpty()) { - messageQueue.removeFirst() - publishQueue() + val runId = pendingRunId ?: return + val eventRunId = payload["runId"].asStringOrNull() ?: return + if (eventRunId != runId) return + + when (payload["state"].asStringOrNull()) { + "delta" -> { + val deltaText = parseAssistantText(payload) + if (!deltaText.isNullOrBlank()) { + upsertPendingAssistant(text = deltaText.trim(), isStreaming = true) + } + } + "final" -> { + val finalText = parseAssistantText(payload)?.trim().orEmpty() + if (finalText.isNotEmpty()) { + upsertPendingAssistant(text = finalText, isStreaming = false) + } else if (pendingAssistantEntryId != null) { + updateConversationEntry(pendingAssistantEntryId!!, text = null, isStreaming = false) + } + completePendingTurn() + } + "error" -> { + val errorMessage = payload["errorMessage"].asStringOrNull()?.trim().orEmpty().ifEmpty { "Voice request failed" } + upsertPendingAssistant(text = errorMessage, isStreaming = false) + completePendingTurn() + } + "aborted" -> { + upsertPendingAssistant(text = "Response aborted", isStreaming = false) + completePendingTurn() + } } - pendingRunId = null - _isSending.value = false - sendQueuedIfIdle() } private fun start() { @@ -146,7 +186,7 @@ class MicCaptureManager( restartJob?.cancel() restartJob = null _isListening.value = false - _statusText.value = if (_isSending.value) "Mic off · sending queued messages…" else "Mic off" + _statusText.value = if (_isSending.value) "Mic off · sending…" else "Mic off" _inputLevel.value = 0f mainHandler.post { recognizer?.cancel() @@ -156,7 +196,7 @@ class MicCaptureManager( } private fun startListeningSession() { - val r = recognizer ?: return + val recognizerInstance = recognizer ?: return val intent = Intent(RecognizerIntent.ACTION_RECOGNIZE_SPEECH).apply { putExtra(RecognizerIntent.EXTRA_LANGUAGE_MODEL, RecognizerIntent.LANGUAGE_MODEL_FREE_FORM) @@ -170,12 +210,17 @@ class MicCaptureManager( speechPossibleSilenceMs, ) } - _statusText.value = if (_isSending.value) "Listening · queueing while gateway replies" else "Listening" + _statusText.value = + when { + _isSending.value -> "Listening · sending queued voice" + messageQueue.isNotEmpty() -> "Listening · ${messageQueue.size} queued" + else -> "Listening" + } _isListening.value = true - r.startListening(intent) + recognizerInstance.startListening(intent) } - private fun scheduleRestart(delayMs: Long = 350L) { + private fun scheduleRestart(delayMs: Long = 300L) { if (stopRequested) return if (!_micEnabled.value) return restartJob?.cancel() @@ -187,57 +232,68 @@ class MicCaptureManager( try { startListeningSession() } catch (_: Throwable) { - // onError will retry + // retry through onError } } } } - private fun publishQueue() { - _queuedMessages.value = messageQueue.toList() - } - private fun flushSessionToQueue() { - val message = sessionSegments.joinToString("\n").trim() + val message = sessionSegments.joinToString(" ").trim() sessionSegments.clear() _liveTranscript.value = null lastFinalSegment = null if (message.isEmpty()) return - messageQueue.addLast(message) + + appendConversation( + role = VoiceConversationRole.User, + text = message, + ) + messageQueue.addLast(QueuedUtterance(text = message)) publishQueue() } + private fun publishQueue() { + _queuedMessages.value = messageQueue.map { it.text } + } + private fun sendQueuedIfIdle() { - if (_micEnabled.value) return if (_isSending.value) return if (messageQueue.isEmpty()) { - _statusText.value = "Mic off" + if (_micEnabled.value) { + _statusText.value = "Listening" + } else { + _statusText.value = "Mic off" + } return } if (!gatewayConnected) { - _statusText.value = "Queued ${messageQueue.size} message(s) · waiting for gateway" + _statusText.value = queuedWaitingStatus() return } + val next = messageQueue.first() _isSending.value = true - _statusText.value = "Sending ${messageQueue.size} queued message(s)…" + _statusText.value = if (_micEnabled.value) "Listening · sending queued voice" else "Sending queued voice" + scope.launch { try { - val runId = sendToGateway(next) + val runId = sendToGateway(next.text) pendingRunId = runId if (runId == null) { - if (messageQueue.isNotEmpty()) { - messageQueue.removeFirst() - publishQueue() - } + messageQueue.removeFirst() + publishQueue() _isSending.value = false + pendingAssistantEntryId = null sendQueuedIfIdle() } } catch (err: Throwable) { _isSending.value = false + pendingRunId = null + pendingAssistantEntryId = null _statusText.value = if (!gatewayConnected) { - "Queued ${messageQueue.size} message(s) · waiting for gateway" + queuedWaitingStatus() } else { "Send failed: ${err.message ?: err::class.simpleName}" } @@ -245,6 +301,69 @@ class MicCaptureManager( } } + private fun completePendingTurn() { + if (messageQueue.isNotEmpty()) { + messageQueue.removeFirst() + publishQueue() + } + pendingRunId = null + pendingAssistantEntryId = null + _isSending.value = false + sendQueuedIfIdle() + } + + private fun queuedWaitingStatus(): String { + return "${messageQueue.size} queued · waiting for gateway" + } + + private fun appendConversation( + role: VoiceConversationRole, + text: String, + isStreaming: Boolean = false, + ): String { + val id = UUID.randomUUID().toString() + _conversation.value = + (_conversation.value + VoiceConversationEntry(id = id, role = role, text = text, isStreaming = isStreaming)) + .takeLast(maxConversationEntries) + return id + } + + private fun updateConversationEntry(id: String, text: String?, isStreaming: Boolean) { + val current = _conversation.value + _conversation.value = + current.map { entry -> + if (entry.id == id) { + val updatedText = text ?: entry.text + entry.copy(text = updatedText, isStreaming = isStreaming) + } else { + entry + } + } + } + + private fun upsertPendingAssistant(text: String, isStreaming: Boolean) { + val currentId = pendingAssistantEntryId + if (currentId == null) { + pendingAssistantEntryId = + appendConversation( + role = VoiceConversationRole.Assistant, + text = text, + isStreaming = isStreaming, + ) + return + } + updateConversationEntry(id = currentId, text = text, isStreaming = isStreaming) + } + + private fun onFinalTranscript(text: String) { + val trimmed = text.trim() + if (trimmed.isEmpty()) return + _liveTranscript.value = trimmed + if (lastFinalSegment == trimmed) return + lastFinalSegment = trimmed + sessionSegments.add(trimmed) + } + private fun disableMic(status: String) { stopRequested = true restartJob?.cancel() @@ -260,15 +379,6 @@ class MicCaptureManager( } } - private fun onFinalTranscript(text: String) { - val trimmed = text.trim() - if (trimmed.isEmpty()) return - _liveTranscript.value = trimmed - if (lastFinalSegment == trimmed) return - lastFinalSegment = trimmed - sessionSegments.add(trimmed) - } - private fun hasMicPermission(): Boolean { return ( ContextCompat.checkSelfPermission(context, Manifest.permission.RECORD_AUDIO) == @@ -276,6 +386,21 @@ class MicCaptureManager( ) } + private fun parseAssistantText(payload: JsonObject): String? { + val message = payload["message"].asObjectOrNull() ?: return null + if (message["role"].asStringOrNull() != "assistant") return null + val content = message["content"] as? JsonArray ?: return null + + val parts = + content.mapNotNull { item -> + val obj = item.asObjectOrNull() ?: return@mapNotNull null + if (obj["type"].asStringOrNull() != "text") return@mapNotNull null + obj["text"].asStringOrNull()?.trim()?.takeIf { it.isNotEmpty() } + } + if (parts.isEmpty()) return null + return parts.joinToString("\n") + } + private val listener = object : RecognitionListener { override fun onReadyForSpeech(params: Bundle?) { @@ -321,17 +446,18 @@ class MicCaptureManager( if ( error == SpeechRecognizer.ERROR_INSUFFICIENT_PERMISSIONS || - error == SpeechRecognizer.ERROR_LANGUAGE_NOT_SUPPORTED || - error == SpeechRecognizer.ERROR_LANGUAGE_UNAVAILABLE + error == SpeechRecognizer.ERROR_LANGUAGE_NOT_SUPPORTED || + error == SpeechRecognizer.ERROR_LANGUAGE_UNAVAILABLE ) { disableMic(status) return } + val restartDelayMs = when (error) { SpeechRecognizer.ERROR_NO_MATCH, SpeechRecognizer.ERROR_SPEECH_TIMEOUT, - -> 1_500L + -> 1_200L SpeechRecognizer.ERROR_TOO_MANY_REQUESTS -> 2_500L else -> 600L } @@ -340,7 +466,11 @@ class MicCaptureManager( override fun onResults(results: Bundle?) { val text = results?.getStringArrayList(SpeechRecognizer.RESULTS_RECOGNITION).orEmpty().firstOrNull() - if (!text.isNullOrBlank()) onFinalTranscript(text) + if (!text.isNullOrBlank()) { + onFinalTranscript(text) + flushSessionToQueue() + sendQueuedIfIdle() + } scheduleRestart() }