feat(android): stream voice turns from mic manager events

This commit is contained in:
Ayaan Zaidi
2026-02-25 18:08:20 +05:30
committed by Ayaan Zaidi
parent 73677f2707
commit 434dc46531

View File

@@ -11,6 +11,7 @@ import android.speech.RecognitionListener
import android.speech.RecognizerIntent
import android.speech.SpeechRecognizer
import androidx.core.content.ContextCompat
import java.util.UUID
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
@@ -18,9 +19,22 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.launch
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonArray
import kotlinx.serialization.json.JsonObject
import kotlinx.serialization.json.JsonPrimitive
enum class VoiceConversationRole {
User,
Assistant,
}
data class VoiceConversationEntry(
val id: String,
val role: VoiceConversationRole,
val text: String,
val isStreaming: Boolean = false,
)
class MicCaptureManager(
private val context: Context,
private val scope: CoroutineScope,
@@ -30,8 +44,13 @@ class MicCaptureManager(
private const val speechMinSessionMs = 30_000L
private const val speechCompleteSilenceMs = 1_500L
private const val speechPossibleSilenceMs = 900L
private const val maxConversationEntries = 40
}
private data class QueuedUtterance(
val text: String,
)
private val mainHandler = Handler(Looper.getMainLooper())
private val json = Json { ignoreUnknownKeys = true }
@@ -50,16 +69,20 @@ class MicCaptureManager(
private val _queuedMessages = MutableStateFlow<List<String>>(emptyList())
val queuedMessages: StateFlow<List<String>> = _queuedMessages
private val _conversation = MutableStateFlow<List<VoiceConversationEntry>>(emptyList())
val conversation: StateFlow<List<VoiceConversationEntry>> = _conversation
private val _inputLevel = MutableStateFlow(0f)
val inputLevel: StateFlow<Float> = _inputLevel
private val _isSending = MutableStateFlow(false)
val isSending: StateFlow<Boolean> = _isSending
private val messageQueue = ArrayDeque<String>()
private val messageQueue = ArrayDeque<QueuedUtterance>()
private val sessionSegments = mutableListOf<String>()
private var lastFinalSegment: String? = null
private var pendingRunId: String? = null
private var pendingAssistantEntryId: String? = null
private var gatewayConnected = false
private var recognizer: SpeechRecognizer? = null
@@ -71,6 +94,7 @@ class MicCaptureManager(
_micEnabled.value = enabled
if (enabled) {
start()
sendQueuedIfIdle()
} else {
stop()
flushSessionToQueue()
@@ -81,38 +105,54 @@ class MicCaptureManager(
fun onGatewayConnectionChanged(connected: Boolean) {
gatewayConnected = connected
if (connected) {
if (!_micEnabled.value) {
sendQueuedIfIdle()
}
sendQueuedIfIdle()
return
}
if (!_micEnabled.value && messageQueue.isNotEmpty()) {
_statusText.value = "Queued ${messageQueue.size} message(s) · waiting for gateway"
if (messageQueue.isNotEmpty()) {
_statusText.value = queuedWaitingStatus()
}
}
fun handleGatewayEvent(event: String, payloadJson: String?) {
if (event != "chat") return
val runId = pendingRunId ?: return
if (payloadJson.isNullOrBlank()) return
val obj =
val payload =
try {
json.parseToJsonElement(payloadJson).asObjectOrNull()
} catch (_: Throwable) {
null
} ?: return
val eventRunId = obj["runId"].asStringOrNull() ?: return
if (eventRunId != runId) return
val state = obj["state"].asStringOrNull() ?: return
if (state != "final") return
if (messageQueue.isNotEmpty()) {
messageQueue.removeFirst()
publishQueue()
val runId = pendingRunId ?: return
val eventRunId = payload["runId"].asStringOrNull() ?: return
if (eventRunId != runId) return
when (payload["state"].asStringOrNull()) {
"delta" -> {
val deltaText = parseAssistantText(payload)
if (!deltaText.isNullOrBlank()) {
upsertPendingAssistant(text = deltaText.trim(), isStreaming = true)
}
}
"final" -> {
val finalText = parseAssistantText(payload)?.trim().orEmpty()
if (finalText.isNotEmpty()) {
upsertPendingAssistant(text = finalText, isStreaming = false)
} else if (pendingAssistantEntryId != null) {
updateConversationEntry(pendingAssistantEntryId!!, text = null, isStreaming = false)
}
completePendingTurn()
}
"error" -> {
val errorMessage = payload["errorMessage"].asStringOrNull()?.trim().orEmpty().ifEmpty { "Voice request failed" }
upsertPendingAssistant(text = errorMessage, isStreaming = false)
completePendingTurn()
}
"aborted" -> {
upsertPendingAssistant(text = "Response aborted", isStreaming = false)
completePendingTurn()
}
}
pendingRunId = null
_isSending.value = false
sendQueuedIfIdle()
}
private fun start() {
@@ -146,7 +186,7 @@ class MicCaptureManager(
restartJob?.cancel()
restartJob = null
_isListening.value = false
_statusText.value = if (_isSending.value) "Mic off · sending queued messages" else "Mic off"
_statusText.value = if (_isSending.value) "Mic off · sending…" else "Mic off"
_inputLevel.value = 0f
mainHandler.post {
recognizer?.cancel()
@@ -156,7 +196,7 @@ class MicCaptureManager(
}
private fun startListeningSession() {
val r = recognizer ?: return
val recognizerInstance = recognizer ?: return
val intent =
Intent(RecognizerIntent.ACTION_RECOGNIZE_SPEECH).apply {
putExtra(RecognizerIntent.EXTRA_LANGUAGE_MODEL, RecognizerIntent.LANGUAGE_MODEL_FREE_FORM)
@@ -170,12 +210,17 @@ class MicCaptureManager(
speechPossibleSilenceMs,
)
}
_statusText.value = if (_isSending.value) "Listening · queueing while gateway replies" else "Listening"
_statusText.value =
when {
_isSending.value -> "Listening · sending queued voice"
messageQueue.isNotEmpty() -> "Listening · ${messageQueue.size} queued"
else -> "Listening"
}
_isListening.value = true
r.startListening(intent)
recognizerInstance.startListening(intent)
}
private fun scheduleRestart(delayMs: Long = 350L) {
private fun scheduleRestart(delayMs: Long = 300L) {
if (stopRequested) return
if (!_micEnabled.value) return
restartJob?.cancel()
@@ -187,57 +232,68 @@ class MicCaptureManager(
try {
startListeningSession()
} catch (_: Throwable) {
// onError will retry
// retry through onError
}
}
}
}
private fun publishQueue() {
_queuedMessages.value = messageQueue.toList()
}
private fun flushSessionToQueue() {
val message = sessionSegments.joinToString("\n").trim()
val message = sessionSegments.joinToString(" ").trim()
sessionSegments.clear()
_liveTranscript.value = null
lastFinalSegment = null
if (message.isEmpty()) return
messageQueue.addLast(message)
appendConversation(
role = VoiceConversationRole.User,
text = message,
)
messageQueue.addLast(QueuedUtterance(text = message))
publishQueue()
}
private fun publishQueue() {
_queuedMessages.value = messageQueue.map { it.text }
}
private fun sendQueuedIfIdle() {
if (_micEnabled.value) return
if (_isSending.value) return
if (messageQueue.isEmpty()) {
_statusText.value = "Mic off"
if (_micEnabled.value) {
_statusText.value = "Listening"
} else {
_statusText.value = "Mic off"
}
return
}
if (!gatewayConnected) {
_statusText.value = "Queued ${messageQueue.size} message(s) · waiting for gateway"
_statusText.value = queuedWaitingStatus()
return
}
val next = messageQueue.first()
_isSending.value = true
_statusText.value = "Sending ${messageQueue.size} queued message(s)…"
_statusText.value = if (_micEnabled.value) "Listening · sending queued voice" else "Sending queued voice"
scope.launch {
try {
val runId = sendToGateway(next)
val runId = sendToGateway(next.text)
pendingRunId = runId
if (runId == null) {
if (messageQueue.isNotEmpty()) {
messageQueue.removeFirst()
publishQueue()
}
messageQueue.removeFirst()
publishQueue()
_isSending.value = false
pendingAssistantEntryId = null
sendQueuedIfIdle()
}
} catch (err: Throwable) {
_isSending.value = false
pendingRunId = null
pendingAssistantEntryId = null
_statusText.value =
if (!gatewayConnected) {
"Queued ${messageQueue.size} message(s) · waiting for gateway"
queuedWaitingStatus()
} else {
"Send failed: ${err.message ?: err::class.simpleName}"
}
@@ -245,6 +301,69 @@ class MicCaptureManager(
}
}
private fun completePendingTurn() {
if (messageQueue.isNotEmpty()) {
messageQueue.removeFirst()
publishQueue()
}
pendingRunId = null
pendingAssistantEntryId = null
_isSending.value = false
sendQueuedIfIdle()
}
private fun queuedWaitingStatus(): String {
return "${messageQueue.size} queued · waiting for gateway"
}
private fun appendConversation(
role: VoiceConversationRole,
text: String,
isStreaming: Boolean = false,
): String {
val id = UUID.randomUUID().toString()
_conversation.value =
(_conversation.value + VoiceConversationEntry(id = id, role = role, text = text, isStreaming = isStreaming))
.takeLast(maxConversationEntries)
return id
}
private fun updateConversationEntry(id: String, text: String?, isStreaming: Boolean) {
val current = _conversation.value
_conversation.value =
current.map { entry ->
if (entry.id == id) {
val updatedText = text ?: entry.text
entry.copy(text = updatedText, isStreaming = isStreaming)
} else {
entry
}
}
}
private fun upsertPendingAssistant(text: String, isStreaming: Boolean) {
val currentId = pendingAssistantEntryId
if (currentId == null) {
pendingAssistantEntryId =
appendConversation(
role = VoiceConversationRole.Assistant,
text = text,
isStreaming = isStreaming,
)
return
}
updateConversationEntry(id = currentId, text = text, isStreaming = isStreaming)
}
private fun onFinalTranscript(text: String) {
val trimmed = text.trim()
if (trimmed.isEmpty()) return
_liveTranscript.value = trimmed
if (lastFinalSegment == trimmed) return
lastFinalSegment = trimmed
sessionSegments.add(trimmed)
}
private fun disableMic(status: String) {
stopRequested = true
restartJob?.cancel()
@@ -260,15 +379,6 @@ class MicCaptureManager(
}
}
private fun onFinalTranscript(text: String) {
val trimmed = text.trim()
if (trimmed.isEmpty()) return
_liveTranscript.value = trimmed
if (lastFinalSegment == trimmed) return
lastFinalSegment = trimmed
sessionSegments.add(trimmed)
}
private fun hasMicPermission(): Boolean {
return (
ContextCompat.checkSelfPermission(context, Manifest.permission.RECORD_AUDIO) ==
@@ -276,6 +386,21 @@ class MicCaptureManager(
)
}
private fun parseAssistantText(payload: JsonObject): String? {
val message = payload["message"].asObjectOrNull() ?: return null
if (message["role"].asStringOrNull() != "assistant") return null
val content = message["content"] as? JsonArray ?: return null
val parts =
content.mapNotNull { item ->
val obj = item.asObjectOrNull() ?: return@mapNotNull null
if (obj["type"].asStringOrNull() != "text") return@mapNotNull null
obj["text"].asStringOrNull()?.trim()?.takeIf { it.isNotEmpty() }
}
if (parts.isEmpty()) return null
return parts.joinToString("\n")
}
private val listener =
object : RecognitionListener {
override fun onReadyForSpeech(params: Bundle?) {
@@ -321,17 +446,18 @@ class MicCaptureManager(
if (
error == SpeechRecognizer.ERROR_INSUFFICIENT_PERMISSIONS ||
error == SpeechRecognizer.ERROR_LANGUAGE_NOT_SUPPORTED ||
error == SpeechRecognizer.ERROR_LANGUAGE_UNAVAILABLE
error == SpeechRecognizer.ERROR_LANGUAGE_NOT_SUPPORTED ||
error == SpeechRecognizer.ERROR_LANGUAGE_UNAVAILABLE
) {
disableMic(status)
return
}
val restartDelayMs =
when (error) {
SpeechRecognizer.ERROR_NO_MATCH,
SpeechRecognizer.ERROR_SPEECH_TIMEOUT,
-> 1_500L
-> 1_200L
SpeechRecognizer.ERROR_TOO_MANY_REQUESTS -> 2_500L
else -> 600L
}
@@ -340,7 +466,11 @@ class MicCaptureManager(
override fun onResults(results: Bundle?) {
val text = results?.getStringArrayList(SpeechRecognizer.RESULTS_RECOGNITION).orEmpty().firstOrNull()
if (!text.isNullOrBlank()) onFinalTranscript(text)
if (!text.isNullOrBlank()) {
onFinalTranscript(text)
flushSessionToQueue()
sendQueuedIfIdle()
}
scheduleRestart()
}