diff --git a/kazeia-android/app/src/main/java/com/kazeia/service/KazeiaPipeline.kt b/kazeia-android/app/src/main/java/com/kazeia/service/KazeiaPipeline.kt index 104c924..6453ecd 100644 --- a/kazeia-android/app/src/main/java/com/kazeia/service/KazeiaPipeline.kt +++ b/kazeia-android/app/src/main/java/com/kazeia/service/KazeiaPipeline.kt @@ -142,12 +142,21 @@ class KazeiaPipeline { * the echo-mode playback through the same path — otherwise each TTS * site reimplemented the "streaming-or-fallback" dispatch. */ - suspend fun speakText(text: String) { + suspend fun speakText( + text: String, + // Fires the instant each synthesized sentence starts playing + // through the speaker, with the sentence text and its audio + // duration. Used by processLlmResponse to defer the KAZEIA + // chat bubble appearance until sound is audible and to pace + // word-by-word reveal inside the bubble. + onSegmentPlaying: ((sentence: String, durationMs: Long) -> Unit)? = null + ) { val ttsEngine = tts ?: return _pipelineState.value = PipelineState.Speaking try { val qwen = ttsEngine as? com.kazeia.tts.Qwen3TtsEngine if (qwen != null) { + qwen.onSegmentPlaying = onSegmentPlaying qwen.startStreamingSession() val streamer = com.kazeia.tts.SentenceStreamer { s -> qwen.enqueueSentence(s) } streamer.append(text) diff --git a/kazeia-android/app/src/main/java/com/kazeia/service/KazeiaService.kt b/kazeia-android/app/src/main/java/com/kazeia/service/KazeiaService.kt index 351b03b..3e1ec4d 100644 --- a/kazeia-android/app/src/main/java/com/kazeia/service/KazeiaService.kt +++ b/kazeia-android/app/src/main/java/com/kazeia/service/KazeiaService.kt @@ -1201,12 +1201,55 @@ class KazeiaService : Service() { log("LLM stats: ${result.tokenCount} tokens in ${result.timeMs}ms (${result.tokensPerSecond} tok/s)") if (responseText.isNotEmpty()) { - addMessage(ChatMessage(role = ChatMessage.Role.KAZEIA, text = responseText)) // Mark the pipeline as Speaking for the duration of TTS so // the continuous-listening mic loop drops frames and we // don't feed our own speaker output back into STT. _pipelineState.value = PipelineState.Speaking - try { pipeline.speakText(responseText) } finally { + // Create an empty KAZEIA bubble up-front so the per-sentence + // reveal has somewhere to append to, but the text stays + // empty until TTS audio for the first sentence starts — + // matching the "conversation" feel the user asked for + // (read-as-you-hear, not read-then-hear). + val bubble = ChatMessage(role = ChatMessage.Role.KAZEIA, text = "") + addMessage(bubble) + val revealScope = kotlinx.coroutines.CoroutineScope(kotlinx.coroutines.Dispatchers.Default) + var revealedSoFar = "" + val revealJobs = mutableListOf() + try { + pipeline.speakText(responseText) { sentence, durationMs -> + // Start a coroutine that appends one word at a time + // over the segment's audio duration. Words are + // separated on whitespace; punctuation rides with + // the trailing word. The prefix (= text already + // revealed from previous sentences) carries over so + // earlier sentences stay on screen. + val prefix = revealedSoFar + val words = sentence.split(Regex("\\s+")).filter { it.isNotBlank() } + revealedSoFar = + if (prefix.isEmpty()) sentence + else "$prefix $sentence" + if (words.isEmpty()) return@speakText + val perWordMs = (durationMs / words.size).coerceAtLeast(40L) + val job = revealScope.launch { + val sb = StringBuilder(prefix) + if (prefix.isNotEmpty()) sb.append(' ') + // Immediately reveal the first word so there's + // no visible gap between audio start and text. + sb.append(words[0]) + updateMessageText(bubble.id, sb.toString()) + for (i in 1 until words.size) { + kotlinx.coroutines.delay(perWordMs) + sb.append(' ').append(words[i]) + updateMessageText(bubble.id, sb.toString()) + } + } + revealJobs.add(job) + } + // After all segments finished playing, ensure the full + // text is visible even if a reveal job was racing. + revealJobs.forEach { try { it.join() } catch (_: Exception) {} } + updateMessageText(bubble.id, responseText) + } finally { _pipelineState.value = if (_isListening.value) PipelineState.Listening else PipelineState.Idle } @@ -1231,6 +1274,19 @@ class KazeiaService : Service() { _messages.value = _messages.value + message } + /** Replace the text of an existing message (identified by id) in the + * message list. Used by the progressive-reveal flow to grow a + * KAZEIA message word-by-word as TTS audio plays. */ + private fun updateMessageText(id: Long, newText: String) { + val current = _messages.value + val idx = current.indexOfLast { it.id == id } + if (idx < 0) return + val m = current[idx] + _messages.value = current.toMutableList().also { + it[idx] = m.copy(text = newText) + } + } + private fun createNotification(): Notification { val intent = Intent(this, ChatActivity::class.java) val pendingIntent = PendingIntent.getActivity( diff --git a/kazeia-android/app/src/main/java/com/kazeia/tts/Qwen3TtsEngine.kt b/kazeia-android/app/src/main/java/com/kazeia/tts/Qwen3TtsEngine.kt index 33e03df..6edf736 100644 --- a/kazeia-android/app/src/main/java/com/kazeia/tts/Qwen3TtsEngine.kt +++ b/kazeia-android/app/src/main/java/com/kazeia/tts/Qwen3TtsEngine.kt @@ -3376,6 +3376,15 @@ class Qwen3TtsEngine( private var sessionMpJob: kotlinx.coroutines.Job? = null private val sessionMpSegIdx = java.util.concurrent.atomic.AtomicInteger(0) + /** + * Fires the moment a synthesized segment starts playing through the + * speaker. [sentence] is the original text submitted to + * [enqueueSentence], [durationMs] is the WAV duration so the caller + * can drive a progressive-reveal UI timer matched to speech pacing. + * Set before calling [startStreamingSession]; cleared on session end. + */ + var onSegmentPlaying: ((sentence: String, durationMs: Long) -> Unit)? = null + private fun startStreamingSessionMp() { if (sessionMpQueue != null) return sessionMpSegIdx.set(0) @@ -3389,7 +3398,10 @@ class Qwen3TtsEngine( // "beg beg" with one player-per-seg). The rendezvous channel has // capacity 2 so the synth worker can stay one seg ahead of the // currently playing seg without growing disk use. - val wavChan = kotlinx.coroutines.channels.Channel>(capacity = 2) + // Carry (segIdx, wavPath, sentence, durationMs) together so the + // playback worker can invoke onSegmentPlaying with the matching + // text and audio length when the segment actually starts playing. + val wavChan = kotlinx.coroutines.channels.Channel(capacity = 2) val scope = kotlinx.coroutines.CoroutineScope(kotlinx.coroutines.Dispatchers.IO) val synthJob = scope.launch { for (sentence in sentenceChan) { @@ -3400,8 +3412,9 @@ class Qwen3TtsEngine( if (audio.isEmpty()) continue val wavPath = "${context?.cacheDir?.absolutePath ?: "/data/local/tmp/kazeia"}/tts_seg_${segIdx}.wav" saveWav(wavPath, audio) - nlog("MP seg $segIdx synthesized (${System.currentTimeMillis() - tSynth}ms), queued for playback") - wavChan.send(segIdx to wavPath) + val durationMs = audio.size * 1000L / SR + nlog("MP seg $segIdx synthesized (${System.currentTimeMillis() - tSynth}ms, ${durationMs}ms audio), queued for playback") + wavChan.send(SegmentReady(segIdx, wavPath, sentence, durationMs)) } catch (e: Exception) { nlog("MP synth error: ${e.message}") } @@ -3424,7 +3437,7 @@ class Qwen3TtsEngine( * channel closes AND the final segment finishes. */ private suspend fun playChainedMediaPlayers( - wavChan: kotlinx.coroutines.channels.ReceiveChannel> + wavChan: kotlinx.coroutines.channels.ReceiveChannel ) { val attrs = android.media.AudioAttributes.Builder() .setUsage(android.media.AudioAttributes.USAGE_MEDIA) @@ -3451,18 +3464,19 @@ class Qwen3TtsEngine( } var current: android.media.MediaPlayer? = null - var currentInfo: Pair? = null + var currentInfo: SegmentReady? = null var next: android.media.MediaPlayer? = null - var nextInfo: Pair? = null + var nextInfo: SegmentReady? = null try { // Bootstrap: wait for first WAV. val first = wavChan.receiveCatching().getOrNull() ?: return currentInfo = first - current = prepareMp(first.second, first.first) + current = prepareMp(first.wavPath, first.segIdx) current!!.setOnCompletionListener { it.release() } current!!.start() - nlog("MP seg ${first.first} started (chained)") + try { onSegmentPlaying?.invoke(first.sentence, first.durationMs) } catch (_: Exception) {} + nlog("MP seg ${first.segIdx} started (chained, ${first.durationMs}ms)") while (true) { // Fetch next WAV (may block). If channel closes, let @@ -3470,24 +3484,28 @@ class Qwen3TtsEngine( val upcoming = wavChan.receiveCatching().getOrNull() if (upcoming == null) break nextInfo = upcoming - next = prepareMp(upcoming.second, upcoming.first) + next = prepareMp(upcoming.wavPath, upcoming.segIdx) current!!.setNextMediaPlayer(next) - nlog("MP seg ${upcoming.first} queued as next") + nlog("MP seg ${upcoming.segIdx} queued as next") // Wait for current seg to finish playing before rotating. - val prevFile = currentInfo!!.second - waitForPlaybackCompletion(current!!, currentInfo!!.first) + val prevFile = currentInfo!!.wavPath + waitForPlaybackCompletion(current!!, currentInfo!!.segIdx) try { java.io.File(prevFile).delete() } catch (_: Exception) {} current = next currentInfo = nextInfo + // `next` player was chained via setNextMediaPlayer and has + // auto-started at this point; notify the UI so it can start + // revealing the sentence in sync with the audio. + try { onSegmentPlaying?.invoke(currentInfo!!.sentence, currentInfo!!.durationMs) } catch (_: Exception) {} next = null nextInfo = null } // Drain: wait for the last prepared player to finish. if (current != null && currentInfo != null) { - waitForPlaybackCompletion(current!!, currentInfo!!.first) - try { java.io.File(currentInfo!!.second).delete() } catch (_: Exception) {} + waitForPlaybackCompletion(current!!, currentInfo!!.segIdx) + try { java.io.File(currentInfo!!.wavPath).delete() } catch (_: Exception) {} } } catch (e: Exception) { nlog("MP playback chain error: ${e.message}") @@ -3497,6 +3515,16 @@ class Qwen3TtsEngine( } } + /** Payload handed from the synth worker to the playback worker so + * the UI can be notified with matching text + duration when each + * segment starts playing. */ + private data class SegmentReady( + val segIdx: Int, + val wavPath: String, + val sentence: String, + val durationMs: Long + ) + private suspend fun waitForPlaybackCompletion( mp: android.media.MediaPlayer, segIdx: Int ) { @@ -3520,6 +3548,7 @@ class Qwen3TtsEngine( chan.close() try { sessionMpJob?.join() } catch (_: Exception) {} sessionMpQueue = null; sessionMpJob = null + onSegmentPlaying = null nlog("streaming session closed (MediaPlayer fallback)") }