149 lines
5.1 KiB
Python
149 lines
5.1 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test CP ExecuTorch NPU quality vs Python reference.
|
|
1. Run full Python TTS pipeline, capturing CP inputs (hidden + cb0_emb)
|
|
2. Send those to CP ET runner on NPU via TCP (adb forward)
|
|
3. Compare NPU codes vs Python codes
|
|
4. Save both for tablet decoding
|
|
"""
|
|
import sys, os, struct, socket, time
|
|
os.chdir("/tmp") # avoid numpy import issues
|
|
import warnings; warnings.filterwarnings("ignore")
|
|
|
|
import torch
|
|
import numpy as np
|
|
|
|
MODEL = "/home/alf/.cache/huggingface/hub/models--Qwen--Qwen3-TTS-12Hz-0.6B-Base/snapshots/5d83992436eae1d760afd27aff78a71d676296fc"
|
|
VOICE = "/opt/Kazeia/voix/damien_15s_24k.wav"
|
|
TEXT = "Bonjour, je m'appelle Kazeia."
|
|
CP_ET_PORT = 5556
|
|
|
|
print("Loading model...")
|
|
from qwen_tts import Qwen3TTSModel
|
|
tts = Qwen3TTSModel.from_pretrained(MODEL, local_files_only=True, device_map="cpu")
|
|
talker = tts.model.talker
|
|
cp = talker.code_predictor
|
|
|
|
# ── Monkey-patch code_predictor.generate to capture inputs + outputs ──
|
|
captured_frames = []
|
|
original_cp_generate = cp.generate.__func__ if hasattr(cp.generate, '__func__') else cp.generate
|
|
|
|
def patched_cp_generate(self_cp, **kwargs):
|
|
ie = kwargs.get("inputs_embeds")
|
|
# inputs_embeds shape: [1, 2, 1024] = [past_hidden, cb0_emb]
|
|
hidden = ie[0, 0, :].detach().cpu().numpy().astype(np.float32)
|
|
cb0_emb = ie[0, 1, :].detach().cpu().numpy().astype(np.float32)
|
|
|
|
result = original_cp_generate(self_cp, **kwargs)
|
|
|
|
# result.sequences shape: [1, 15] = CB1-CB15 codes
|
|
py_codes = result.sequences[0].tolist()
|
|
captured_frames.append({
|
|
"hidden": hidden,
|
|
"cb0_emb": cb0_emb,
|
|
"py_codes": py_codes, # CB1-CB15 from Python
|
|
})
|
|
return result
|
|
|
|
# Bind the patch
|
|
import types
|
|
cp.generate = types.MethodType(patched_cp_generate, cp)
|
|
|
|
# ── Run full Python pipeline ──
|
|
print(f"Generating: '{TEXT}'")
|
|
audio_list, sr = tts.generate_voice_clone(
|
|
text=TEXT, ref_audio=VOICE, language="french",
|
|
x_vector_only_mode=True, non_streaming_mode=True,
|
|
)
|
|
audio = audio_list[0]
|
|
print(f"Python: {len(audio)} samples, {len(audio)/sr:.2f}s, {len(captured_frames)} frames captured")
|
|
|
|
import soundfile as sf
|
|
sf.write("/opt/Kazeia/kazeia_PY_REF.wav", audio, sr)
|
|
print("Saved: kazeia_PY_REF.wav")
|
|
|
|
# ── Extract CB0 codes from captured data ──
|
|
# CB0 comes from talker's codec_head, but we need it from the generation output.
|
|
# We can reverse-lookup from cb0_emb: find closest embedding in talker's embedding table.
|
|
talker_emb = talker.get_input_embeddings()
|
|
emb_weight = talker_emb.weight.detach().cpu().numpy() # [vocab_size, 1024]
|
|
|
|
cb0_codes = []
|
|
for frame in captured_frames:
|
|
# Find which embedding row matches cb0_emb
|
|
diffs = np.sum((emb_weight - frame["cb0_emb"]) ** 2, axis=1)
|
|
cb0 = int(np.argmin(diffs))
|
|
cb0_codes.append(cb0)
|
|
if diffs[cb0] > 1e-6:
|
|
print(f" WARNING: cb0 lookup imprecise, min_diff={diffs[cb0]:.6f}")
|
|
|
|
print(f"CB0 codes (first 5): {cb0_codes[:5]}")
|
|
|
|
# ── Send to CP ET runner on NPU ──
|
|
print(f"\nConnecting to CP ET runner on port {CP_ET_PORT}...")
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
sock.settimeout(30)
|
|
sock.connect(("127.0.0.1", CP_ET_PORT))
|
|
print("Connected!")
|
|
|
|
npu_codes_all = []
|
|
total_npu_ms = 0
|
|
mismatches = 0
|
|
|
|
for i, frame in enumerate(captured_frames):
|
|
# Send hidden_in + cb0_emb = 2*1024*4 = 8192 bytes
|
|
payload = frame["hidden"].tobytes() + frame["cb0_emb"].tobytes()
|
|
sock.sendall(payload)
|
|
|
|
# Read response: 15 ints (60 bytes) + 1 float (4 bytes) = 64 bytes
|
|
resp = b""
|
|
while len(resp) < 64:
|
|
chunk = sock.recv(64 - len(resp))
|
|
if not chunk:
|
|
raise RuntimeError(f"Socket closed at frame {i}")
|
|
resp += chunk
|
|
|
|
npu_codes = list(struct.unpack("<15i", resp[:60]))
|
|
timing = struct.unpack("<f", resp[60:64])[0]
|
|
total_npu_ms += timing
|
|
|
|
npu_codes_all.append(npu_codes)
|
|
|
|
py_codes = frame["py_codes"]
|
|
match = sum(1 for a, b in zip(npu_codes, py_codes) if a == b)
|
|
if match < 15:
|
|
mismatches += 1
|
|
print(f" Frame {i:3d}: NPU {timing:6.1f}ms match {match:2d}/15 "
|
|
f"NPU={npu_codes[:4]} PY={py_codes[:4]}")
|
|
|
|
sock.close()
|
|
n = len(captured_frames)
|
|
print(f"\n{'='*60}")
|
|
print(f"RESULTS: {n} frames, NPU total {total_npu_ms:.0f}ms ({total_npu_ms/n:.1f}ms/frame)")
|
|
print(f"Mismatches: {mismatches}/{n} frames ({100*mismatches/n:.0f}%)")
|
|
|
|
# ── Save codes for tablet decoding ──
|
|
# Format: binary, n_frames * 16 int32 (CB0, CB1..CB15)
|
|
codes_py = []
|
|
codes_npu = []
|
|
for i in range(n):
|
|
codes_py.append([cb0_codes[i]] + captured_frames[i]["py_codes"])
|
|
codes_npu.append([cb0_codes[i]] + npu_codes_all[i])
|
|
|
|
py_path = "/opt/Kazeia/test_codes_python.bin"
|
|
npu_path = "/opt/Kazeia/test_codes_npu.bin"
|
|
|
|
with open(py_path, "wb") as f:
|
|
f.write(struct.pack("<i", n))
|
|
for frame_codes in codes_py:
|
|
f.write(struct.pack(f"<{len(frame_codes)}i", *frame_codes))
|
|
|
|
with open(npu_path, "wb") as f:
|
|
f.write(struct.pack("<i", n))
|
|
for frame_codes in codes_npu:
|
|
f.write(struct.pack(f"<{len(frame_codes)}i", *frame_codes))
|
|
|
|
print(f"\nSaved: {py_path} ({os.path.getsize(py_path)} bytes)")
|
|
print(f"Saved: {npu_path} ({os.path.getsize(npu_path)} bytes)")
|
|
print(f"\nNext: push to tablet and decode with V2 decoder")
|