Add DSP warmup + fix pipeline thread contention

- Warmup forward() for talker+CP during init (avoids 7s DSP compilation
  on first pipeline run)
- Cancel previous pipeline job before starting new one
- Use Dispatchers.IO for pipeline intent

First run after warmup: talker 19ms/step, CP 59ms/step → RTF ~1.9

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Kazeia Team 2026-04-09 09:24:18 +02:00
parent 8bfe6c7445
commit 6e6c562d53
2 changed files with 50 additions and 5 deletions

View File

@ -79,6 +79,7 @@ class KazeiaService : Service() {
val aiWorkload: StateFlow<AiWorkload> = _aiWorkload
private val serviceScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private var currentPipelineJob: kotlinx.coroutines.Job? = null
private val _isListening = MutableStateFlow(false)
val isListening: StateFlow<Boolean> = _isListening
@ -108,12 +109,18 @@ class KazeiaService : Service() {
// Auto-pipeline trigger via: am startservice -n com.kazeia/.service.KazeiaService --ez run_pipeline true
if (intent?.getBooleanExtra("run_pipeline", false) == true) {
log("Auto-pipeline triggered via intent")
serviceScope.launch {
// Cancel any running pipeline to avoid blocking
currentPipelineJob?.cancel()
currentPipelineJob = serviceScope.launch(Dispatchers.IO) {
// Wait for TTS to be loaded
while (!::tts.isInitialized || tts !is com.kazeia.tts.Qwen3TtsEngine) {
kotlinx.coroutines.delay(1000)
kotlinx.coroutines.delay(500)
}
processTextInput("pipeline")
val embedsPath = "/data/local/tmp/kazeia/models/qwen3-tts-npu/full_pipeline_embeds.bin"
val qwenTts = tts as? com.kazeia.tts.Qwen3TtsEngine ?: return@launch
val audio = qwenTts.generateFromEmbeds(embedsPath)
log("Pipeline done: ${audio.size} samples (${audio.size/24000f}s)")
// Audio is played by the TTS engine internally
}
}
intent?.getStringExtra("full_pipeline")?.let { embedsPath ->
@ -912,8 +919,10 @@ class KazeiaService : Service() {
fun processTextInput(text: String) {
log("processTextInput: '$text'")
serviceScope.launch {
// Special test commands
// Special test commands — cancel previous pipeline first
if (text.trim().lowercase().let { it.startsWith("pipeline") || it.startsWith("!pipeline") || it.startsWith("\\!pipeline") || it == "go" }) {
currentPipelineJob?.cancel()
currentPipelineJob = kotlin.coroutines.coroutineContext[kotlinx.coroutines.Job]
val embedsPath = "/data/local/tmp/kazeia/models/qwen3-tts-npu/full_pipeline_embeds.bin"
val wavPath = "/data/local/tmp/kazeia/tts_output.wav"
addMessage(ChatMessage(role = ChatMessage.Role.SYSTEM, text = "Running full pipeline..."))

View File

@ -265,11 +265,47 @@ class Qwen3TtsEngine(
nlog("Talker .pte JNI loaded+compiled: ${System.currentTimeMillis() - t0}ms, result=$lm")
if (lm != 0) { nlog("Talker .pte loadMethod failed"); talkerPteModule = null }
else {
// Load rotary tables for talker .pte
val path = "/data/local/tmp/kazeia/models"
talkerPteRotaryCos = loadNpy("$path/talker_pte_rotary_cos.npy")
talkerPteRotarySin = loadNpy("$path/talker_pte_rotary_sin.npy")
nlog("Talker .pte rotary: ${talkerPteRotaryCos?.size} floats")
// Warmup both models: first forward() triggers QNN DSP compilation (~7s)
// Better to pay this cost at init than at first pipeline run
val tw = System.currentTimeMillis()
try {
val dE = FloatArray(TALKER_DIM)
val dM = FloatArray(TALKER_PTE_KV_LEN) { -1e9f }; dM[TALKER_PTE_KV_LEN - 1] = 0f
val dC = FloatArray(TALKER_HEAD_DIM) { 1f }
val dS = FloatArray(TALKER_HEAD_DIM)
val tkvSz = TALKER_HEADS * TALKER_PTE_KV_LEN * TALKER_HEAD_DIM
val ins = mutableListOf(
org.pytorch.executorch.EValue.from(org.pytorch.executorch.Tensor.fromBlob(dE, longArrayOf(1,1,TALKER_DIM.toLong()))),
org.pytorch.executorch.EValue.from(org.pytorch.executorch.Tensor.fromBlob(dM, longArrayOf(1,1,1,TALKER_PTE_KV_LEN.toLong()))),
org.pytorch.executorch.EValue.from(org.pytorch.executorch.Tensor.fromBlob(dC, longArrayOf(1,1,TALKER_HEAD_DIM.toLong()))),
org.pytorch.executorch.EValue.from(org.pytorch.executorch.Tensor.fromBlob(dS, longArrayOf(1,1,TALKER_HEAD_DIM.toLong())))
)
for (i in 0 until TALKER_LAYERS * 2) ins.add(org.pytorch.executorch.EValue.from(
org.pytorch.executorch.Tensor.fromBlob(FloatArray(tkvSz), longArrayOf(1,TALKER_HEADS.toLong(),TALKER_PTE_KV_LEN.toLong(),TALKER_HEAD_DIM.toLong()))))
talkerPteModule!!.forward(*ins.toTypedArray())
nlog("Talker warmup: ${System.currentTimeMillis() - tw}ms")
} catch (e: Exception) { nlog("Talker warmup failed: ${e.message}") }
// CP warmup
val cw = System.currentTimeMillis()
try {
val ckvSz = CP_KV_HEADS * CP_KV_LEN * CP_HEAD_DIM
val cIns = mutableListOf(
org.pytorch.executorch.EValue.from(org.pytorch.executorch.Tensor.fromBlob(FloatArray(TALKER_DIM), longArrayOf(1,1,TALKER_DIM.toLong()))),
org.pytorch.executorch.EValue.from(org.pytorch.executorch.Tensor.fromBlob(FloatArray(CP_KV_LEN){-1e9f}.also{it[CP_KV_LEN-1]=0f}, longArrayOf(1,1,1,CP_KV_LEN.toLong()))),
org.pytorch.executorch.EValue.from(org.pytorch.executorch.Tensor.fromBlob(FloatArray(CP_HEAD_DIM){1f}, longArrayOf(1,1,CP_HEAD_DIM.toLong()))),
org.pytorch.executorch.EValue.from(org.pytorch.executorch.Tensor.fromBlob(FloatArray(CP_HEAD_DIM), longArrayOf(1,1,CP_HEAD_DIM.toLong())))
)
for (i in 0 until CP_LAYERS * 2) cIns.add(org.pytorch.executorch.EValue.from(
org.pytorch.executorch.Tensor.fromBlob(FloatArray(ckvSz), longArrayOf(1,CP_KV_HEADS.toLong(),CP_KV_LEN.toLong(),CP_HEAD_DIM.toLong()))))
cpPteModule!!.forward(*cIns.toTypedArray())
nlog("CP warmup: ${System.currentTimeMillis() - cw}ms")
} catch (e: Exception) { nlog("CP warmup failed: ${e.message}") }
}
} catch (e: Exception) {
nlog("Talker .pte JNI failed: ${e.message}")