kazeia/kazeia-android/app/src/main/java/com/kazeia/service/KazeiaPipeline.kt

184 lines
6.5 KiB
Kotlin

package com.kazeia.service
import android.util.Log
import com.kazeia.core.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
/**
* Orchestrates the full pipeline: STT → [Processors chain] → TTS
* STT and TTS are independent — they only exchange text.
* Processors are pluggable and executed in order.
*/
class KazeiaPipeline {
companion object {
private const val TAG = "Pipeline"
}
private var stt: SttEngine? = null
private var tts: TtsEngine? = null
private val processors = mutableListOf<MessageProcessor>()
private val context = ConversationContext()
private val _messages = MutableStateFlow<List<ChatMessage>>(emptyList())
val messages: StateFlow<List<ChatMessage>> = _messages
private val _logs = MutableStateFlow<List<String>>(emptyList())
val logs: StateFlow<List<String>> = _logs
private val _pipelineState = MutableStateFlow<PipelineState>(PipelineState.Idle)
val pipelineState: StateFlow<PipelineState> = _pipelineState
fun setStt(engine: SttEngine) { stt = engine; log("STT set: ${engine::class.simpleName}") }
fun setTts(engine: TtsEngine) { tts = engine; log("TTS set: ${engine::class.simpleName}") }
fun addProcessor(processor: MessageProcessor) {
processors.add(processor)
log("Processor added: ${processor.name} (${processors.size} total)")
}
fun removeProcessor(name: String) {
processors.removeAll { it.name == name }
log("Processor removed: $name")
}
fun getProcessors(): List<MessageProcessor> = processors.toList()
/**
* Process text input through the pipeline: [Processors] → TTS
*/
suspend fun processText(text: String) {
log("Input: '$text'")
addMessage(ChatMessage(role = ChatMessage.Role.PATIENT, text = text))
context.metadata["last_input"] = text
val t0 = System.currentTimeMillis()
val result = runProcessors(text)
val processingMs = System.currentTimeMillis() - t0
if (result.responseText.isNotBlank()) {
log("Response: '${result.responseText.take(60)}...' (${processingMs}ms)")
addMessage(ChatMessage(role = ChatMessage.Role.KAZEIA, text = result.responseText))
// Log metadata
result.metadata.forEach { (k, v) -> log(" $k=$v") }
// TTS
if (result.shouldSpeak) {
speak(result.responseText)
}
}
// Update context history
context.history.toMutableList().apply {
add(ChatMessage(role = ChatMessage.Role.PATIENT, text = text))
if (result.responseText.isNotBlank()) {
add(ChatMessage(role = ChatMessage.Role.KAZEIA, text = result.responseText))
}
}
}
/**
* Process audio through: STT → [Processors] → TTS
*/
suspend fun processAudio(audioData: ShortArray) {
val sttEngine = stt ?: return
_pipelineState.value = PipelineState.Transcribing
val t0 = System.currentTimeMillis()
val transcription = sttEngine.transcribe(audioData, context.language)
val sttMs = System.currentTimeMillis() - t0
if (transcription.text.isBlank()) {
log("STT: (silence) ${sttMs}ms")
return
}
log("STT: '${transcription.text}' ${sttMs}ms (RTF=${"%.2f".format(sttMs.toFloat() / (audioData.size * 1000f / 16000))})")
_pipelineState.value = PipelineState.Transcribed(transcription.text)
processText(transcription.text)
}
/**
* Run text through all processors in chain.
* First processor that returns shouldContinueChain=false wins.
*/
private suspend fun runProcessors(text: String): ProcessorResult {
_pipelineState.value = PipelineState.Thinking
for (processor in processors) {
if (!processor.isReady()) continue
try {
val t0 = System.currentTimeMillis()
val result = processor.process(text, context)
val elapsed = System.currentTimeMillis() - t0
log("[${processor.name}] ${elapsed}ms → ${if (result.shouldContinueChain) "continue" else "done"}")
if (!result.shouldContinueChain) {
return result
}
} catch (e: Exception) {
log("[${processor.name}] ERROR: ${e.message}")
}
}
// No processor handled it → echo
return ProcessorResult(responseText = text, metadata = mapOf("mode" to "echo"))
}
private suspend fun speak(text: String) = speakText(text)
/**
* Public entry point for speaking a full (possibly multi-sentence) text.
* When TTS is Qwen3, text is sentence-split and fed through a streaming
* session so first audio arrives after the first sentence rather than
* after the full response is synthesised. Other TTS backends fall back
* to the legacy one-shot synthesizeAndPlay call.
*
* Made public so KazeiaService can route its voice-command replies and
* the echo-mode playback through the same path — otherwise each TTS
* site reimplemented the "streaming-or-fallback" dispatch.
*/
suspend fun speakText(text: String) {
val ttsEngine = tts ?: return
_pipelineState.value = PipelineState.Speaking
try {
val qwen = ttsEngine as? com.kazeia.tts.Qwen3TtsEngine
if (qwen != null) {
qwen.startStreamingSession()
val streamer = com.kazeia.tts.SentenceStreamer { s -> qwen.enqueueSentence(s) }
streamer.append(text)
streamer.flush()
qwen.endStreamingSession()
} else {
ttsEngine.synthesizeAndPlay(text, context.language,
onComplete = { _pipelineState.value = PipelineState.Idle }
)
}
} catch (e: Exception) {
log("TTS error: ${e.message}")
}
_pipelineState.value = PipelineState.Idle
}
fun addMessage(msg: ChatMessage) {
_messages.value = _messages.value + msg
}
fun log(msg: String) {
Log.i(TAG, msg)
val time = java.text.SimpleDateFormat("HH:mm:ss.SSS", java.util.Locale.FRANCE)
.format(java.util.Date())
_logs.value = _logs.value.takeLast(199) + "$time $msg"
}
fun release() {
stt?.release()
tts?.release()
processors.forEach { it.release() }
}
}