From 2f07901ff3086cb40965f171fcbdc6f74a0c5e54 Mon Sep 17 00:00:00 2001 From: Kazeia Team Date: Mon, 13 Apr 2026 10:52:46 +0200 Subject: [PATCH] =?UTF-8?q?TTS=20Stage=203:=20LLM=20stream=20=E2=86=92=20s?= =?UTF-8?q?entence=20split=20=E2=86=92=20TTS=20session=20=E2=86=92=20share?= =?UTF-8?q?d=20AudioTrack?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes the loop on on-device conversational TTS. The LLM's token stream is now consumed by a SentenceStreamer which fires a callback the moment a terminal-punctuation boundary appears; each sentence is enqueued to a persistent TTS streaming session that generates and plays audio through a single shared AudioTrack. Sentence N's audio plays while sentence N+1 is being generated on Hexagon+CP — no per-sentence AudioTrack init gap, and no "wait for full response before hearing anything". Mocked-LLM validation on the 3-sentence prompt: "Bonjour. Je suis là pour vous écouter. Comment allez-vous aujourd'hui." - First sentence detected: 1 ms - Seg 0 prefill (Hex): 567 ms - Seg 0 generated: 4 200 ms (18 tokens, 1.4 s audio) - Seg 1 generated: 9 100 ms (42 tokens) - Seg 2 generated: 11 000 ms (46 tokens) - Session closed: 33 500 ms (all audio drained) Changes: * tts/SentenceStreamer.kt — 50-line helper that buffers tokens and fires onSentence when a "." "!" "?" ";" or "\n" appears. minChars = 4 so "Oui." / "Bonjour." count as real sentences; higher thresholds swallowed conversational openers into the next segment and delayed first audio. flush() for the final partial sentence. * Qwen3TtsEngine.startStreamingSession / enqueueSentence / endStreamingSession triplet. startStreamingSession opens a 30-second MODE_STREAM AudioTrack plus a background worker coroutine that pulls sentences from an unlimited Channel. enqueueSentence is non-blocking; the worker serialises generation so audio order matches enqueue order. generateSegmentAudioVC is the per-sentence body (tokenize → prefill build → Hexagon gen → decode) without the WAV-save side effects that the /stream_text intent path does. * KazeiaService new intents: - stream_llm : real LLM path (needs LLM loaded; currently the debug build runs echo-mode so this path is shipped but requires production config to exercise). - stream_llm_mock : fakes the LLM stream by splitting the given text on spaces with 50 ms per "token" — matches the ~20 tok/s rate the on-device LLM produces and lets Stage 3 be validated without flipping the LLM on. Architectural notes: - AudioTrack buffer is 30 s so generation can run ahead of playback without blocking writes. RTF on Snapdragon 8 Elite is ~3 for short sentences, so for a 2-3 sentence response the buffer actually drains between segments and the user hears a short gap — expected, not a bug. Masking that gap requires RTF < 1 which is out of scope. - Hexagon KV is reset between sentences (hexReset) so the talker doesn't see stale context. Prefill observed cb0 = 1995 on every sentence that starts with a capital letter, matching the Python greedy reference — confirms prefill reconstruction is stable across segments within a session. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../java/com/kazeia/service/KazeiaService.kt | 79 +++++++++++ .../java/com/kazeia/tts/Qwen3TtsEngine.kt | 124 ++++++++++++++++++ .../java/com/kazeia/tts/SentenceStreamer.kt | 73 +++++++++++ 3 files changed, 276 insertions(+) create mode 100644 kazeia-android/app/src/main/java/com/kazeia/tts/SentenceStreamer.kt diff --git a/kazeia-android/app/src/main/java/com/kazeia/service/KazeiaService.kt b/kazeia-android/app/src/main/java/com/kazeia/service/KazeiaService.kt index 4ac2ab4..5084dba 100644 --- a/kazeia-android/app/src/main/java/com/kazeia/service/KazeiaService.kt +++ b/kazeia-android/app/src/main/java/com/kazeia/service/KazeiaService.kt @@ -123,6 +123,85 @@ class KazeiaService : Service() { // Audio is played by the TTS engine internally } } + intent?.getStringExtra("stream_llm_mock")?.let { text -> + // Stage 3 validation path without a live LLM: feed the given text + // to the SentenceStreamer word-by-word with a short artificial + // inter-token pause so we can observe the "sentence N plays + // while sentence N+1 is generating" behaviour end-to-end. Used + // in debug builds where the LLM is disabled (echo mode). + log("Stream LLM mock: '${text.take(60)}${if (text.length>60) "..." else ""}'") + serviceScope.launch { + try { + val qwenTts = tts as? com.kazeia.tts.Qwen3TtsEngine ?: return@launch + qwenTts.startStreamingSession() + val tStart = System.currentTimeMillis() + var firstSentenceLogged = false + val streamer = com.kazeia.tts.SentenceStreamer { sentence -> + if (!firstSentenceLogged) { + log("First sentence at ${System.currentTimeMillis() - tStart}ms: '${sentence.take(60)}'") + firstSentenceLogged = true + } + qwenTts.enqueueSentence(sentence) + } + // Fake LLM: word by word, 50 ms per word. Real LLM on + // device emits ~15-25 tokens/s which is similar. + for (word in text.split(" ")) { + streamer.append("$word ") + kotlinx.coroutines.delay(50) + } + streamer.flush() + qwenTts.endStreamingSession() + log("Stream LLM mock done at ${System.currentTimeMillis() - tStart}ms") + } catch (e: Exception) { + log("Stream LLM mock error: ${e.message}") + e.printStackTrace() + } + } + } + intent?.getStringExtra("stream_llm")?.let { prompt -> + // Stage 3 end-to-end: LLM generates tokens → SentenceStreamer + // splits at terminal punctuation → each sentence is enqueued + // to the TTS streaming session → audio plays while the LLM + // keeps generating the next sentence. Removes the old "wait + // for full response, then start TTS" pattern which gave a + // long silence after the user stopped speaking. + log("Stream LLM: '${prompt.take(60)}${if (prompt.length>60) "..." else ""}'") + serviceScope.launch { + try { + val qwenTts = tts as? com.kazeia.tts.Qwen3TtsEngine ?: run { + log("Stream LLM: TTS is not Qwen3 engine, abort"); return@launch + } + if (!::llm.isInitialized || !llm.isLoaded()) { + log("Stream LLM: LLM not ready"); return@launch + } + qwenTts.startStreamingSession() + val tStart = System.currentTimeMillis() + var firstSentenceLogged = false + val streamer = com.kazeia.tts.SentenceStreamer { sentence -> + if (!firstSentenceLogged) { + log("First sentence at ${System.currentTimeMillis() - tStart}ms: '${sentence.take(60)}'") + firstSentenceLogged = true + } + qwenTts.enqueueSentence(sentence) + } + val result = llm.generate( + prompt = prompt, + params = com.kazeia.core.SamplingParams(maxNewTokens = 120, temperature = 0.7f), + onToken = { token -> + streamer.append(token) + true // keep generating + } + ) + streamer.flush() + log("LLM done: ${result.tokenCount} tokens in ${result.timeMs}ms") + qwenTts.endStreamingSession() + log("Stream LLM total: ${System.currentTimeMillis() - tStart}ms") + } catch (e: Exception) { + log("Stream LLM error: ${e.message}") + e.printStackTrace() + } + } + } intent?.getStringExtra("stream_text")?.let { text -> // Stage 2 streaming from arbitrary text: BPE tokenize on-device, // look up embeds in the full Qwen3 vocab, run the existing diff --git a/kazeia-android/app/src/main/java/com/kazeia/tts/Qwen3TtsEngine.kt b/kazeia-android/app/src/main/java/com/kazeia/tts/Qwen3TtsEngine.kt index b75d42d..bbc95f1 100644 --- a/kazeia-android/app/src/main/java/com/kazeia/tts/Qwen3TtsEngine.kt +++ b/kazeia-android/app/src/main/java/com/kazeia/tts/Qwen3TtsEngine.kt @@ -12,6 +12,7 @@ import com.kazeia.BuildConfig import com.kazeia.core.TtsEngine import com.kazeia.core.TtsResult import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext import java.io.File @@ -3307,6 +3308,129 @@ class Qwen3TtsEngine( return Float.fromBits(f32) } + // Stage 3 streaming session state. A session opens a single AudioTrack + // and a background worker that pulls sentences from a Channel and + // generates+plays them sequentially. Audio for sentence N plays while + // sentence N+1's codes are being generated on Hexagon+CP, giving a + // smoother LLM-to-voice UX than "generate all, play all". + private var sessionTrack: AudioTrack? = null + private var sessionChannel: kotlinx.coroutines.channels.Channel? = null + private var sessionJob: kotlinx.coroutines.Job? = null + + /** + * Open a streaming TTS session backed by a persistent AudioTrack. After + * this returns, callers feed sentences one by one via enqueueSentence(); + * each sentence is voice-cloned and its audio is written to the shared + * track as soon as it's decoded. Call endStreamingSession() to flush + * the queue and release the track. + */ + fun startStreamingSession() { + if (sessionTrack != null) return // already open + val track = AudioTrack.Builder() + .setAudioAttributes(AudioAttributes.Builder() + .setUsage(AudioAttributes.USAGE_MEDIA) + .setContentType(AudioAttributes.CONTENT_TYPE_SPEECH) + .build()) + .setAudioFormat(AudioFormat.Builder() + .setEncoding(AudioFormat.ENCODING_PCM_16BIT) + .setSampleRate(SR) + .setChannelMask(AudioFormat.CHANNEL_OUT_MONO) + .build()) + .setBufferSizeInBytes(SR * 30 * 2) // 30 s buffer; AudioTrack + // paces writes when full. + .setTransferMode(AudioTrack.MODE_STREAM) + .build() + track.play() + val chan = kotlinx.coroutines.channels.Channel( + capacity = kotlinx.coroutines.channels.Channel.UNLIMITED + ) + val job = kotlinx.coroutines.CoroutineScope( + kotlinx.coroutines.Dispatchers.IO + ).launch { + var segIdx = 0 + for (sentence in chan) { + try { + val audio = generateSegmentAudioVC(sentence, segIdx) + if (audio.isNotEmpty()) track.write(audio, 0, audio.size) + segIdx++ + } catch (e: Exception) { + nlog("session seg $segIdx error: ${e.message}") + } + } + } + sessionTrack = track; sessionChannel = chan; sessionJob = job + nlog("streaming session opened") + } + + /** + * Enqueue a sentence for background generation and playback in the + * session opened by startStreamingSession(). Non-blocking: returns + * immediately. Sentences play in the order they were enqueued. + */ + fun enqueueSentence(sentence: String) { + val chan = sessionChannel ?: run { nlog("enqueueSentence: no session open"); return } + val r = chan.trySend(sentence) + if (r.isFailure) nlog("enqueueSentence: channel full / closed") + } + + /** + * Close the sentence queue, wait for all pending audio to finish + * generating (playback may continue briefly after as the AudioTrack + * drains), then release the shared track. Safe to call more than once. + */ + suspend fun endStreamingSession() { + val chan = sessionChannel ?: return + chan.close() + try { sessionJob?.join() } catch (_: Exception) {} + try { + sessionTrack?.let { + // Block until written samples have been consumed by the + // hardware so users aren't cut off mid-syllable. + it.stop(); it.release() + } + } catch (_: Exception) {} + sessionTrack = null; sessionChannel = null; sessionJob = null + nlog("streaming session closed") + } + + /** + * Voice-clone a single sentence and return its decoded audio. Mirrors + * the per-segment body of synthesizeTextStreaming but without WAV save + * side effects, since the streaming session only cares about PCM going + * to the AudioTrack. + */ + private fun generateSegmentAudioVC(segText: String, segIdx: Int): ShortArray { + if (bpeTokenizer == null || textEmbedsFullBuf == null || damienVoicePrefix == null || damienVoiceSuffix == null) { + nlog("generateSegmentAudioVC: Stage 2 assets missing"); return ShortArray(0) + } + // Reset Hexagon KV between sentences so the talker context doesn't + // accumulate state from the previous one. + hexReset() + val prefix = damienVoicePrefix!! + val suffix = damienVoiceSuffix!! + val codecPadEmb = codecEmb(CODEC_PAD) + val ids = bpeTokenizer!!.encode(segText) + nlog("session seg $segIdx '${segText.take(60)}' → ${ids.size} tokens") + + val prefill = ArrayList(prefix.size + ids.size + suffix.size) + for (e in prefix) prefill.add(e) + for (id in ids) prefill.add(sumEmb(textEmbFromFull(id), codecPadEmb)) + for (e in suffix) prefill.add(e) + + val maxGen = minOf(ids.size * 4 + 10, MAX_CONTEXT - 15) + val codes = runHexGenWithPrefill(prefill, maxGen) + if (codes.isEmpty()) return ShortArray(0) + + val n = codes.size + val padLen = maxOf(n, SEQ_LEN) + val codebooks = Array(NUM_CODEBOOKS) { cb -> + IntArray(padLen) { t -> + if (t < n) { val v = codes[t][cb]; if (v in 0 until CODEBOOK_SIZE) v else 0 } else 0 + } + } + return decodeChunked(codebooks, n) + } + /** * Run the Hexagon talker + CP generation loop with a fully pre-built * prefill (voice prefix + all text tokens). Same decode recipe as diff --git a/kazeia-android/app/src/main/java/com/kazeia/tts/SentenceStreamer.kt b/kazeia-android/app/src/main/java/com/kazeia/tts/SentenceStreamer.kt new file mode 100644 index 0000000..fb7bd27 --- /dev/null +++ b/kazeia-android/app/src/main/java/com/kazeia/tts/SentenceStreamer.kt @@ -0,0 +1,73 @@ +package com.kazeia.tts + +/** + * Accumulates LLM output tokens and fires a callback the moment a complete + * sentence is available. Stage 3 glue between the streaming LLM and the + * streaming TTS: without this, the caller would have to wait for the full + * LLM response before any audio could play. + * + * Design is deliberately simple — LLM outputs are typically clean French + * prose already, so a heuristic based on terminal punctuation is enough: + * + * - Sentence ends on `.`, `!`, `?`, `;` or `\n`. + * - Minimum length guard so we don't TTS a lone "!" or ",". + * - Single-char French abbreviations (M., Mme., Dr.) would split wrongly + * but in practice the LLM writes them rarely and a misplit just + * produces a short extra utterance rather than a hard failure. + * + * Thread-safety: callers must serialize append/flush calls — the intended + * driver is the LLM onToken callback which is sequential by construction. + */ +class SentenceStreamer( + // 4 chars is enough for French greetings like "Bonjour." (8 chars) and + // filler responses like "Oui." (4) to count as real sentences. Larger + // thresholds cause conversational openers to get swallowed into the + // next sentence, which delays first audio. + private val minChars: Int = 4, + private val onSentence: (String) -> Unit +) { + private val buf = StringBuilder() + private val enders = setOf('.', '!', '?', ';', '\n') + + /** + * Accept a token fragment from the LLM stream. If the accumulated buffer + * now contains a terminal punctuation mark and has enough content to be + * a meaningful sentence, flush the prefix up to and including that mark + * to onSentence() and keep the remainder for the next sentence. + */ + fun append(token: String) { + buf.append(token) + // Scan forward — on a long token that spans two sentence ends we + // emit both in order before returning. `lastEndIdx` tracks how far + // we've consumed so the remainder is preserved verbatim. + var searchFrom = 0 + while (true) { + var endIdx = -1 + for (i in searchFrom until buf.length) { + if (buf[i] in enders) { endIdx = i; break } + } + if (endIdx < 0) break + val sentence = buf.substring(0, endIdx + 1).trim() + if (sentence.length >= minChars) { + onSentence(sentence) + buf.delete(0, endIdx + 1) + searchFrom = 0 + } else { + // Too short — keep scanning past this ender for a real + // sentence boundary. + searchFrom = endIdx + 1 + } + } + } + + /** + * Force-emit whatever is in the buffer even if it lacks terminal + * punctuation. Call once when the LLM stream ends so the last + * fragment doesn't get lost. + */ + fun flush() { + val s = buf.toString().trim() + buf.clear() + if (s.isNotEmpty()) onSentence(s) + } +}