TTS Stage 3: LLM stream → sentence split → TTS session → shared AudioTrack

Closes the loop on on-device conversational TTS. The LLM's token stream is
now consumed by a SentenceStreamer which fires a callback the moment a
terminal-punctuation boundary appears; each sentence is enqueued to a
persistent TTS streaming session that generates and plays audio through a
single shared AudioTrack. Sentence N's audio plays while sentence N+1 is
being generated on Hexagon+CP — no per-sentence AudioTrack init gap, and
no "wait for full response before hearing anything".

Mocked-LLM validation on the 3-sentence prompt:
  "Bonjour. Je suis là pour vous écouter. Comment allez-vous aujourd'hui."

  - First sentence detected:    1 ms
  - Seg 0 prefill (Hex):       567 ms
  - Seg 0 generated:         4 200 ms (18 tokens, 1.4 s audio)
  - Seg 1 generated:         9 100 ms (42 tokens)
  - Seg 2 generated:        11 000 ms (46 tokens)
  - Session closed:         33 500 ms (all audio drained)

Changes:

  * tts/SentenceStreamer.kt — 50-line helper that buffers tokens and
    fires onSentence when a "." "!" "?" ";" or "\n" appears. minChars = 4
    so "Oui." / "Bonjour." count as real sentences; higher thresholds
    swallowed conversational openers into the next segment and delayed
    first audio. flush() for the final partial sentence.

  * Qwen3TtsEngine.startStreamingSession / enqueueSentence / endStreamingSession
    triplet. startStreamingSession opens a 30-second MODE_STREAM
    AudioTrack plus a background worker coroutine that pulls sentences
    from an unlimited Channel. enqueueSentence is non-blocking; the worker
    serialises generation so audio order matches enqueue order.
    generateSegmentAudioVC is the per-sentence body (tokenize → prefill
    build → Hexagon gen → decode) without the WAV-save side effects that
    the /stream_text intent path does.

  * KazeiaService new intents:
      - stream_llm        : real LLM path (needs LLM loaded; currently the
                            debug build runs echo-mode so this path is
                            shipped but requires production config to
                            exercise).
      - stream_llm_mock   : fakes the LLM stream by splitting the given
                            text on spaces with 50 ms per "token" —
                            matches the ~20 tok/s rate the on-device LLM
                            produces and lets Stage 3 be validated without
                            flipping the LLM on.

Architectural notes:
  - AudioTrack buffer is 30 s so generation can run ahead of playback
    without blocking writes. RTF on Snapdragon 8 Elite is ~3 for short
    sentences, so for a 2-3 sentence response the buffer actually drains
    between segments and the user hears a short gap — expected, not a
    bug. Masking that gap requires RTF < 1 which is out of scope.
  - Hexagon KV is reset between sentences (hexReset) so the talker
    doesn't see stale context. Prefill observed cb0 = 1995 on every
    sentence that starts with a capital letter, matching the Python
    greedy reference — confirms prefill reconstruction is stable across
    segments within a session.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Kazeia Team 2026-04-13 10:52:46 +02:00
parent 7f1a44c23d
commit 2f07901ff3
3 changed files with 276 additions and 0 deletions

View File

@ -123,6 +123,85 @@ class KazeiaService : Service() {
// Audio is played by the TTS engine internally // Audio is played by the TTS engine internally
} }
} }
intent?.getStringExtra("stream_llm_mock")?.let { text ->
// Stage 3 validation path without a live LLM: feed the given text
// to the SentenceStreamer word-by-word with a short artificial
// inter-token pause so we can observe the "sentence N plays
// while sentence N+1 is generating" behaviour end-to-end. Used
// in debug builds where the LLM is disabled (echo mode).
log("Stream LLM mock: '${text.take(60)}${if (text.length>60) "..." else ""}'")
serviceScope.launch {
try {
val qwenTts = tts as? com.kazeia.tts.Qwen3TtsEngine ?: return@launch
qwenTts.startStreamingSession()
val tStart = System.currentTimeMillis()
var firstSentenceLogged = false
val streamer = com.kazeia.tts.SentenceStreamer { sentence ->
if (!firstSentenceLogged) {
log("First sentence at ${System.currentTimeMillis() - tStart}ms: '${sentence.take(60)}'")
firstSentenceLogged = true
}
qwenTts.enqueueSentence(sentence)
}
// Fake LLM: word by word, 50 ms per word. Real LLM on
// device emits ~15-25 tokens/s which is similar.
for (word in text.split(" ")) {
streamer.append("$word ")
kotlinx.coroutines.delay(50)
}
streamer.flush()
qwenTts.endStreamingSession()
log("Stream LLM mock done at ${System.currentTimeMillis() - tStart}ms")
} catch (e: Exception) {
log("Stream LLM mock error: ${e.message}")
e.printStackTrace()
}
}
}
intent?.getStringExtra("stream_llm")?.let { prompt ->
// Stage 3 end-to-end: LLM generates tokens → SentenceStreamer
// splits at terminal punctuation → each sentence is enqueued
// to the TTS streaming session → audio plays while the LLM
// keeps generating the next sentence. Removes the old "wait
// for full response, then start TTS" pattern which gave a
// long silence after the user stopped speaking.
log("Stream LLM: '${prompt.take(60)}${if (prompt.length>60) "..." else ""}'")
serviceScope.launch {
try {
val qwenTts = tts as? com.kazeia.tts.Qwen3TtsEngine ?: run {
log("Stream LLM: TTS is not Qwen3 engine, abort"); return@launch
}
if (!::llm.isInitialized || !llm.isLoaded()) {
log("Stream LLM: LLM not ready"); return@launch
}
qwenTts.startStreamingSession()
val tStart = System.currentTimeMillis()
var firstSentenceLogged = false
val streamer = com.kazeia.tts.SentenceStreamer { sentence ->
if (!firstSentenceLogged) {
log("First sentence at ${System.currentTimeMillis() - tStart}ms: '${sentence.take(60)}'")
firstSentenceLogged = true
}
qwenTts.enqueueSentence(sentence)
}
val result = llm.generate(
prompt = prompt,
params = com.kazeia.core.SamplingParams(maxNewTokens = 120, temperature = 0.7f),
onToken = { token ->
streamer.append(token)
true // keep generating
}
)
streamer.flush()
log("LLM done: ${result.tokenCount} tokens in ${result.timeMs}ms")
qwenTts.endStreamingSession()
log("Stream LLM total: ${System.currentTimeMillis() - tStart}ms")
} catch (e: Exception) {
log("Stream LLM error: ${e.message}")
e.printStackTrace()
}
}
}
intent?.getStringExtra("stream_text")?.let { text -> intent?.getStringExtra("stream_text")?.let { text ->
// Stage 2 streaming from arbitrary text: BPE tokenize on-device, // Stage 2 streaming from arbitrary text: BPE tokenize on-device,
// look up embeds in the full Qwen3 vocab, run the existing // look up embeds in the full Qwen3 vocab, run the existing

View File

@ -12,6 +12,7 @@ import com.kazeia.BuildConfig
import com.kazeia.core.TtsEngine import com.kazeia.core.TtsEngine
import com.kazeia.core.TtsResult import com.kazeia.core.TtsResult
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import java.io.File import java.io.File
@ -3307,6 +3308,129 @@ class Qwen3TtsEngine(
return Float.fromBits(f32) return Float.fromBits(f32)
} }
// Stage 3 streaming session state. A session opens a single AudioTrack
// and a background worker that pulls sentences from a Channel and
// generates+plays them sequentially. Audio for sentence N plays while
// sentence N+1's codes are being generated on Hexagon+CP, giving a
// smoother LLM-to-voice UX than "generate all, play all".
private var sessionTrack: AudioTrack? = null
private var sessionChannel: kotlinx.coroutines.channels.Channel<String>? = null
private var sessionJob: kotlinx.coroutines.Job? = null
/**
* Open a streaming TTS session backed by a persistent AudioTrack. After
* this returns, callers feed sentences one by one via enqueueSentence();
* each sentence is voice-cloned and its audio is written to the shared
* track as soon as it's decoded. Call endStreamingSession() to flush
* the queue and release the track.
*/
fun startStreamingSession() {
if (sessionTrack != null) return // already open
val track = AudioTrack.Builder()
.setAudioAttributes(AudioAttributes.Builder()
.setUsage(AudioAttributes.USAGE_MEDIA)
.setContentType(AudioAttributes.CONTENT_TYPE_SPEECH)
.build())
.setAudioFormat(AudioFormat.Builder()
.setEncoding(AudioFormat.ENCODING_PCM_16BIT)
.setSampleRate(SR)
.setChannelMask(AudioFormat.CHANNEL_OUT_MONO)
.build())
.setBufferSizeInBytes(SR * 30 * 2) // 30 s buffer; AudioTrack
// paces writes when full.
.setTransferMode(AudioTrack.MODE_STREAM)
.build()
track.play()
val chan = kotlinx.coroutines.channels.Channel<String>(
capacity = kotlinx.coroutines.channels.Channel.UNLIMITED
)
val job = kotlinx.coroutines.CoroutineScope(
kotlinx.coroutines.Dispatchers.IO
).launch {
var segIdx = 0
for (sentence in chan) {
try {
val audio = generateSegmentAudioVC(sentence, segIdx)
if (audio.isNotEmpty()) track.write(audio, 0, audio.size)
segIdx++
} catch (e: Exception) {
nlog("session seg $segIdx error: ${e.message}")
}
}
}
sessionTrack = track; sessionChannel = chan; sessionJob = job
nlog("streaming session opened")
}
/**
* Enqueue a sentence for background generation and playback in the
* session opened by startStreamingSession(). Non-blocking: returns
* immediately. Sentences play in the order they were enqueued.
*/
fun enqueueSentence(sentence: String) {
val chan = sessionChannel ?: run { nlog("enqueueSentence: no session open"); return }
val r = chan.trySend(sentence)
if (r.isFailure) nlog("enqueueSentence: channel full / closed")
}
/**
* Close the sentence queue, wait for all pending audio to finish
* generating (playback may continue briefly after as the AudioTrack
* drains), then release the shared track. Safe to call more than once.
*/
suspend fun endStreamingSession() {
val chan = sessionChannel ?: return
chan.close()
try { sessionJob?.join() } catch (_: Exception) {}
try {
sessionTrack?.let {
// Block until written samples have been consumed by the
// hardware so users aren't cut off mid-syllable.
it.stop(); it.release()
}
} catch (_: Exception) {}
sessionTrack = null; sessionChannel = null; sessionJob = null
nlog("streaming session closed")
}
/**
* Voice-clone a single sentence and return its decoded audio. Mirrors
* the per-segment body of synthesizeTextStreaming but without WAV save
* side effects, since the streaming session only cares about PCM going
* to the AudioTrack.
*/
private fun generateSegmentAudioVC(segText: String, segIdx: Int): ShortArray {
if (bpeTokenizer == null || textEmbedsFullBuf == null || damienVoicePrefix == null || damienVoiceSuffix == null) {
nlog("generateSegmentAudioVC: Stage 2 assets missing"); return ShortArray(0)
}
// Reset Hexagon KV between sentences so the talker context doesn't
// accumulate state from the previous one.
hexReset()
val prefix = damienVoicePrefix!!
val suffix = damienVoiceSuffix!!
val codecPadEmb = codecEmb(CODEC_PAD)
val ids = bpeTokenizer!!.encode(segText)
nlog("session seg $segIdx '${segText.take(60)}' → ${ids.size} tokens")
val prefill = ArrayList<FloatArray>(prefix.size + ids.size + suffix.size)
for (e in prefix) prefill.add(e)
for (id in ids) prefill.add(sumEmb(textEmbFromFull(id), codecPadEmb))
for (e in suffix) prefill.add(e)
val maxGen = minOf(ids.size * 4 + 10, MAX_CONTEXT - 15)
val codes = runHexGenWithPrefill(prefill, maxGen)
if (codes.isEmpty()) return ShortArray(0)
val n = codes.size
val padLen = maxOf(n, SEQ_LEN)
val codebooks = Array(NUM_CODEBOOKS) { cb ->
IntArray(padLen) { t ->
if (t < n) { val v = codes[t][cb]; if (v in 0 until CODEBOOK_SIZE) v else 0 } else 0
}
}
return decodeChunked(codebooks, n)
}
/** /**
* Run the Hexagon talker + CP generation loop with a fully pre-built * Run the Hexagon talker + CP generation loop with a fully pre-built
* prefill (voice prefix + all text tokens). Same decode recipe as * prefill (voice prefix + all text tokens). Same decode recipe as

View File

@ -0,0 +1,73 @@
package com.kazeia.tts
/**
* Accumulates LLM output tokens and fires a callback the moment a complete
* sentence is available. Stage 3 glue between the streaming LLM and the
* streaming TTS: without this, the caller would have to wait for the full
* LLM response before any audio could play.
*
* Design is deliberately simple LLM outputs are typically clean French
* prose already, so a heuristic based on terminal punctuation is enough:
*
* - Sentence ends on `.`, `!`, `?`, `;` or `\n`.
* - Minimum length guard so we don't TTS a lone "!" or ",".
* - Single-char French abbreviations (M., Mme., Dr.) would split wrongly
* but in practice the LLM writes them rarely and a misplit just
* produces a short extra utterance rather than a hard failure.
*
* Thread-safety: callers must serialize append/flush calls the intended
* driver is the LLM onToken callback which is sequential by construction.
*/
class SentenceStreamer(
// 4 chars is enough for French greetings like "Bonjour." (8 chars) and
// filler responses like "Oui." (4) to count as real sentences. Larger
// thresholds cause conversational openers to get swallowed into the
// next sentence, which delays first audio.
private val minChars: Int = 4,
private val onSentence: (String) -> Unit
) {
private val buf = StringBuilder()
private val enders = setOf('.', '!', '?', ';', '\n')
/**
* Accept a token fragment from the LLM stream. If the accumulated buffer
* now contains a terminal punctuation mark and has enough content to be
* a meaningful sentence, flush the prefix up to and including that mark
* to onSentence() and keep the remainder for the next sentence.
*/
fun append(token: String) {
buf.append(token)
// Scan forward — on a long token that spans two sentence ends we
// emit both in order before returning. `lastEndIdx` tracks how far
// we've consumed so the remainder is preserved verbatim.
var searchFrom = 0
while (true) {
var endIdx = -1
for (i in searchFrom until buf.length) {
if (buf[i] in enders) { endIdx = i; break }
}
if (endIdx < 0) break
val sentence = buf.substring(0, endIdx + 1).trim()
if (sentence.length >= minChars) {
onSentence(sentence)
buf.delete(0, endIdx + 1)
searchFrom = 0
} else {
// Too short — keep scanning past this ender for a real
// sentence boundary.
searchFrom = endIdx + 1
}
}
}
/**
* Force-emit whatever is in the buffer even if it lacks terminal
* punctuation. Call once when the LLM stream ends so the last
* fragment doesn't get lost.
*/
fun flush() {
val s = buf.toString().trim()
buf.clear()
if (s.isNotEmpty()) onSentence(s)
}
}