UI: reveal Kazeia reply in sync with TTS audio (per-sentence, per-word)

Matches the 'conversation' feel the user asked for. Previously the
full LLM response appeared in the chat as soon as generation
finished, then audio played 5–10 s later — text and sound felt
decoupled. Now:

- The KAZEIA bubble is created empty and only starts filling when
  the first TTS segment actually starts playing through the speaker
  (we already split the response by sentence for the chained-
  MediaPlayer pipeline; that split drives the reveal too).
- Inside each sentence, words are appended one by one at a cadence
  of (audio duration / word count) — slower sentences reveal slower,
  matching speech pacing. The first word of each sentence appears
  immediately so audio and text stay aligned at the start.

Implementation:
- Qwen3TtsEngine: added `onSegmentPlaying(sentence, durationMs)`
  listener, invoked from the chained-MediaPlayer worker the moment
  each segment's MediaPlayer.start() lands. Sentence + duration are
  carried end-to-end via a new SegmentReady data class.
- KazeiaPipeline.speakText: forwards an optional listener down to
  the TTS engine, same signature.
- KazeiaService: new updateMessageText(id, text) helper. In
  processLlmResponse, the bubble is added empty before speakText and
  grown by a reveal coroutine per sentence; after speakText returns
  we snap to the full text as a safety net.

No change to the stream_llm debug intent path — it still uses the
old enqueueSentence flow directly and doesn't need the reveal (no
UI bubble there).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Kazeia Team 2026-04-14 22:58:18 +02:00
parent 6a958c1a10
commit f17131aefb
3 changed files with 111 additions and 17 deletions

View File

@ -142,12 +142,21 @@ class KazeiaPipeline {
* the echo-mode playback through the same path otherwise each TTS * the echo-mode playback through the same path otherwise each TTS
* site reimplemented the "streaming-or-fallback" dispatch. * site reimplemented the "streaming-or-fallback" dispatch.
*/ */
suspend fun speakText(text: String) { suspend fun speakText(
text: String,
// Fires the instant each synthesized sentence starts playing
// through the speaker, with the sentence text and its audio
// duration. Used by processLlmResponse to defer the KAZEIA
// chat bubble appearance until sound is audible and to pace
// word-by-word reveal inside the bubble.
onSegmentPlaying: ((sentence: String, durationMs: Long) -> Unit)? = null
) {
val ttsEngine = tts ?: return val ttsEngine = tts ?: return
_pipelineState.value = PipelineState.Speaking _pipelineState.value = PipelineState.Speaking
try { try {
val qwen = ttsEngine as? com.kazeia.tts.Qwen3TtsEngine val qwen = ttsEngine as? com.kazeia.tts.Qwen3TtsEngine
if (qwen != null) { if (qwen != null) {
qwen.onSegmentPlaying = onSegmentPlaying
qwen.startStreamingSession() qwen.startStreamingSession()
val streamer = com.kazeia.tts.SentenceStreamer { s -> qwen.enqueueSentence(s) } val streamer = com.kazeia.tts.SentenceStreamer { s -> qwen.enqueueSentence(s) }
streamer.append(text) streamer.append(text)

View File

@ -1201,12 +1201,55 @@ class KazeiaService : Service() {
log("LLM stats: ${result.tokenCount} tokens in ${result.timeMs}ms (${result.tokensPerSecond} tok/s)") log("LLM stats: ${result.tokenCount} tokens in ${result.timeMs}ms (${result.tokensPerSecond} tok/s)")
if (responseText.isNotEmpty()) { if (responseText.isNotEmpty()) {
addMessage(ChatMessage(role = ChatMessage.Role.KAZEIA, text = responseText))
// Mark the pipeline as Speaking for the duration of TTS so // Mark the pipeline as Speaking for the duration of TTS so
// the continuous-listening mic loop drops frames and we // the continuous-listening mic loop drops frames and we
// don't feed our own speaker output back into STT. // don't feed our own speaker output back into STT.
_pipelineState.value = PipelineState.Speaking _pipelineState.value = PipelineState.Speaking
try { pipeline.speakText(responseText) } finally { // Create an empty KAZEIA bubble up-front so the per-sentence
// reveal has somewhere to append to, but the text stays
// empty until TTS audio for the first sentence starts —
// matching the "conversation" feel the user asked for
// (read-as-you-hear, not read-then-hear).
val bubble = ChatMessage(role = ChatMessage.Role.KAZEIA, text = "")
addMessage(bubble)
val revealScope = kotlinx.coroutines.CoroutineScope(kotlinx.coroutines.Dispatchers.Default)
var revealedSoFar = ""
val revealJobs = mutableListOf<kotlinx.coroutines.Job>()
try {
pipeline.speakText(responseText) { sentence, durationMs ->
// Start a coroutine that appends one word at a time
// over the segment's audio duration. Words are
// separated on whitespace; punctuation rides with
// the trailing word. The prefix (= text already
// revealed from previous sentences) carries over so
// earlier sentences stay on screen.
val prefix = revealedSoFar
val words = sentence.split(Regex("\\s+")).filter { it.isNotBlank() }
revealedSoFar =
if (prefix.isEmpty()) sentence
else "$prefix $sentence"
if (words.isEmpty()) return@speakText
val perWordMs = (durationMs / words.size).coerceAtLeast(40L)
val job = revealScope.launch {
val sb = StringBuilder(prefix)
if (prefix.isNotEmpty()) sb.append(' ')
// Immediately reveal the first word so there's
// no visible gap between audio start and text.
sb.append(words[0])
updateMessageText(bubble.id, sb.toString())
for (i in 1 until words.size) {
kotlinx.coroutines.delay(perWordMs)
sb.append(' ').append(words[i])
updateMessageText(bubble.id, sb.toString())
}
}
revealJobs.add(job)
}
// After all segments finished playing, ensure the full
// text is visible even if a reveal job was racing.
revealJobs.forEach { try { it.join() } catch (_: Exception) {} }
updateMessageText(bubble.id, responseText)
} finally {
_pipelineState.value = if (_isListening.value) _pipelineState.value = if (_isListening.value)
PipelineState.Listening else PipelineState.Idle PipelineState.Listening else PipelineState.Idle
} }
@ -1231,6 +1274,19 @@ class KazeiaService : Service() {
_messages.value = _messages.value + message _messages.value = _messages.value + message
} }
/** Replace the text of an existing message (identified by id) in the
* message list. Used by the progressive-reveal flow to grow a
* KAZEIA message word-by-word as TTS audio plays. */
private fun updateMessageText(id: Long, newText: String) {
val current = _messages.value
val idx = current.indexOfLast { it.id == id }
if (idx < 0) return
val m = current[idx]
_messages.value = current.toMutableList().also {
it[idx] = m.copy(text = newText)
}
}
private fun createNotification(): Notification { private fun createNotification(): Notification {
val intent = Intent(this, ChatActivity::class.java) val intent = Intent(this, ChatActivity::class.java)
val pendingIntent = PendingIntent.getActivity( val pendingIntent = PendingIntent.getActivity(

View File

@ -3376,6 +3376,15 @@ class Qwen3TtsEngine(
private var sessionMpJob: kotlinx.coroutines.Job? = null private var sessionMpJob: kotlinx.coroutines.Job? = null
private val sessionMpSegIdx = java.util.concurrent.atomic.AtomicInteger(0) private val sessionMpSegIdx = java.util.concurrent.atomic.AtomicInteger(0)
/**
* Fires the moment a synthesized segment starts playing through the
* speaker. [sentence] is the original text submitted to
* [enqueueSentence], [durationMs] is the WAV duration so the caller
* can drive a progressive-reveal UI timer matched to speech pacing.
* Set before calling [startStreamingSession]; cleared on session end.
*/
var onSegmentPlaying: ((sentence: String, durationMs: Long) -> Unit)? = null
private fun startStreamingSessionMp() { private fun startStreamingSessionMp() {
if (sessionMpQueue != null) return if (sessionMpQueue != null) return
sessionMpSegIdx.set(0) sessionMpSegIdx.set(0)
@ -3389,7 +3398,10 @@ class Qwen3TtsEngine(
// "beg beg" with one player-per-seg). The rendezvous channel has // "beg beg" with one player-per-seg). The rendezvous channel has
// capacity 2 so the synth worker can stay one seg ahead of the // capacity 2 so the synth worker can stay one seg ahead of the
// currently playing seg without growing disk use. // currently playing seg without growing disk use.
val wavChan = kotlinx.coroutines.channels.Channel<Pair<Int, String>>(capacity = 2) // Carry (segIdx, wavPath, sentence, durationMs) together so the
// playback worker can invoke onSegmentPlaying with the matching
// text and audio length when the segment actually starts playing.
val wavChan = kotlinx.coroutines.channels.Channel<SegmentReady>(capacity = 2)
val scope = kotlinx.coroutines.CoroutineScope(kotlinx.coroutines.Dispatchers.IO) val scope = kotlinx.coroutines.CoroutineScope(kotlinx.coroutines.Dispatchers.IO)
val synthJob = scope.launch { val synthJob = scope.launch {
for (sentence in sentenceChan) { for (sentence in sentenceChan) {
@ -3400,8 +3412,9 @@ class Qwen3TtsEngine(
if (audio.isEmpty()) continue if (audio.isEmpty()) continue
val wavPath = "${context?.cacheDir?.absolutePath ?: "/data/local/tmp/kazeia"}/tts_seg_${segIdx}.wav" val wavPath = "${context?.cacheDir?.absolutePath ?: "/data/local/tmp/kazeia"}/tts_seg_${segIdx}.wav"
saveWav(wavPath, audio) saveWav(wavPath, audio)
nlog("MP seg $segIdx synthesized (${System.currentTimeMillis() - tSynth}ms), queued for playback") val durationMs = audio.size * 1000L / SR
wavChan.send(segIdx to wavPath) nlog("MP seg $segIdx synthesized (${System.currentTimeMillis() - tSynth}ms, ${durationMs}ms audio), queued for playback")
wavChan.send(SegmentReady(segIdx, wavPath, sentence, durationMs))
} catch (e: Exception) { } catch (e: Exception) {
nlog("MP synth error: ${e.message}") nlog("MP synth error: ${e.message}")
} }
@ -3424,7 +3437,7 @@ class Qwen3TtsEngine(
* channel closes AND the final segment finishes. * channel closes AND the final segment finishes.
*/ */
private suspend fun playChainedMediaPlayers( private suspend fun playChainedMediaPlayers(
wavChan: kotlinx.coroutines.channels.ReceiveChannel<Pair<Int, String>> wavChan: kotlinx.coroutines.channels.ReceiveChannel<SegmentReady>
) { ) {
val attrs = android.media.AudioAttributes.Builder() val attrs = android.media.AudioAttributes.Builder()
.setUsage(android.media.AudioAttributes.USAGE_MEDIA) .setUsage(android.media.AudioAttributes.USAGE_MEDIA)
@ -3451,18 +3464,19 @@ class Qwen3TtsEngine(
} }
var current: android.media.MediaPlayer? = null var current: android.media.MediaPlayer? = null
var currentInfo: Pair<Int, String>? = null var currentInfo: SegmentReady? = null
var next: android.media.MediaPlayer? = null var next: android.media.MediaPlayer? = null
var nextInfo: Pair<Int, String>? = null var nextInfo: SegmentReady? = null
try { try {
// Bootstrap: wait for first WAV. // Bootstrap: wait for first WAV.
val first = wavChan.receiveCatching().getOrNull() ?: return val first = wavChan.receiveCatching().getOrNull() ?: return
currentInfo = first currentInfo = first
current = prepareMp(first.second, first.first) current = prepareMp(first.wavPath, first.segIdx)
current!!.setOnCompletionListener { it.release() } current!!.setOnCompletionListener { it.release() }
current!!.start() current!!.start()
nlog("MP seg ${first.first} started (chained)") try { onSegmentPlaying?.invoke(first.sentence, first.durationMs) } catch (_: Exception) {}
nlog("MP seg ${first.segIdx} started (chained, ${first.durationMs}ms)")
while (true) { while (true) {
// Fetch next WAV (may block). If channel closes, let // Fetch next WAV (may block). If channel closes, let
@ -3470,24 +3484,28 @@ class Qwen3TtsEngine(
val upcoming = wavChan.receiveCatching().getOrNull() val upcoming = wavChan.receiveCatching().getOrNull()
if (upcoming == null) break if (upcoming == null) break
nextInfo = upcoming nextInfo = upcoming
next = prepareMp(upcoming.second, upcoming.first) next = prepareMp(upcoming.wavPath, upcoming.segIdx)
current!!.setNextMediaPlayer(next) current!!.setNextMediaPlayer(next)
nlog("MP seg ${upcoming.first} queued as next") nlog("MP seg ${upcoming.segIdx} queued as next")
// Wait for current seg to finish playing before rotating. // Wait for current seg to finish playing before rotating.
val prevFile = currentInfo!!.second val prevFile = currentInfo!!.wavPath
waitForPlaybackCompletion(current!!, currentInfo!!.first) waitForPlaybackCompletion(current!!, currentInfo!!.segIdx)
try { java.io.File(prevFile).delete() } catch (_: Exception) {} try { java.io.File(prevFile).delete() } catch (_: Exception) {}
current = next current = next
currentInfo = nextInfo currentInfo = nextInfo
// `next` player was chained via setNextMediaPlayer and has
// auto-started at this point; notify the UI so it can start
// revealing the sentence in sync with the audio.
try { onSegmentPlaying?.invoke(currentInfo!!.sentence, currentInfo!!.durationMs) } catch (_: Exception) {}
next = null next = null
nextInfo = null nextInfo = null
} }
// Drain: wait for the last prepared player to finish. // Drain: wait for the last prepared player to finish.
if (current != null && currentInfo != null) { if (current != null && currentInfo != null) {
waitForPlaybackCompletion(current!!, currentInfo!!.first) waitForPlaybackCompletion(current!!, currentInfo!!.segIdx)
try { java.io.File(currentInfo!!.second).delete() } catch (_: Exception) {} try { java.io.File(currentInfo!!.wavPath).delete() } catch (_: Exception) {}
} }
} catch (e: Exception) { } catch (e: Exception) {
nlog("MP playback chain error: ${e.message}") nlog("MP playback chain error: ${e.message}")
@ -3497,6 +3515,16 @@ class Qwen3TtsEngine(
} }
} }
/** Payload handed from the synth worker to the playback worker so
* the UI can be notified with matching text + duration when each
* segment starts playing. */
private data class SegmentReady(
val segIdx: Int,
val wavPath: String,
val sentence: String,
val durationMs: Long
)
private suspend fun waitForPlaybackCompletion( private suspend fun waitForPlaybackCompletion(
mp: android.media.MediaPlayer, segIdx: Int mp: android.media.MediaPlayer, segIdx: Int
) { ) {
@ -3520,6 +3548,7 @@ class Qwen3TtsEngine(
chan.close() chan.close()
try { sessionMpJob?.join() } catch (_: Exception) {} try { sessionMpJob?.join() } catch (_: Exception) {}
sessionMpQueue = null; sessionMpJob = null sessionMpQueue = null; sessionMpJob = null
onSegmentPlaying = null
nlog("streaming session closed (MediaPlayer fallback)") nlog("streaming session closed (MediaPlayer fallback)")
} }