###########################################################################################
class QwenTTS(BaseTTS):
def __init__(self, opt, parent):
super().__init__(opt, parent)
self.api_key = getattr(opt, ’QWEN_API_KEY’, ’’) # 阿里云API Key
self.model = getattr(opt, ’QWEN_MODEL’, ’qwen3-tts-flash’) # 模型名稱
self.voice = getattr(opt, ’QWEN_VOICE’, ’Cherry’) # 音色
self.language_type = getattr(opt, ’QWEN_LANGUAGE_TYPE’, ’Chinese’) # 語言類型
self.api_url = "https://dashscope.aliyuncs.com/api/v1/services/aigc/multimodal-generation/generation"
# 創(chuàng)建 tempwav 目錄
self.temp_dir = "tempwav"
os.makedirs(self.temp_dir, exist_ok=True)
# 驗證必要的配置參數(shù)
if not self.api_key:
logger.error("Qwen TTS 配置不完整,請設(shè)置 QWEN_API_KEY")
raise ValueError("Qwen TTS 配置不完整")
def txt_to_audio(self, msg):
text, textevent = msg
# 直接調(diào)用同步版本的 TTS
try:
self.sync_qwen_tts(text, msg)
except Exception as e:
logger.error(f"Qwen TTS 處理失敗: {e}")
# 發(fā)送結(jié)束事件,即使失敗也要通知前端
eventpoint = {’type’: ’tts’, ’status’: ’end’, ’text’: text, ’msgevent’: textevent}
self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), eventpoint)
def sync_qwen_tts(self, text, msg):
"""同步版本的 Qwen TTS"""
start = time.perf_counter()
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"X-DashScope-SSE": "enable" # 啟用服務(wù)器發(fā)送事件
}
payload = {
"model": self.model,
"input": {
"text": text,
"voice": self.voice,
"language_type": self.language_type
}
}
try:
logger.info(f"Qwen TTS 請求開始: {text[:50]}...")
response = requests.post(
self.api_url,
headers=headers,
json=payload,
stream=True,
timeout=30
)
end = time.perf_counter()
logger.info(f"Qwen TTS 請求時間: {end-start:.2f}s")
if response.status_code != 200:
logger.error(f"Qwen TTS 請求失敗: {response.status_code}, {response.text}")
return
# 收集所有音頻數(shù)據(jù)
all_audio_data = bytearray()
audio_url = None
for line in response.iter_lines(decode_unicode=False):
if not line:
continue
try:
line_str = line.decode(’utf-8’).strip()
# 檢查是否是 SSE 數(shù)據(jù)行
if line_str.startswith(’data:’):
data_str = line_str[5:].strip()
# 檢查結(jié)束標(biāo)記
if data_str == ’[DONE]’:
logger.info("Qwen TTS 流式請求完成")
break
# 解析 JSON 數(shù)據(jù)
data = json.loads(data_str)
# 檢查是否有輸出數(shù)據(jù)
if "output" in data and isinstance(data["output"], dict):
output = data["output"]
# 檢查是否有音頻 URL
if "audio" in output and isinstance(output["audio"], dict):
audio_info = output["audio"]
# 檢查是否有音頻 URL
if "url" in audio_info and audio_info["url"]:
audio_url = audio_info["url"]
logger.info(f"獲取到音頻 URL: {audio_url}")
# 檢查是否有直接的數(shù)據(jù)(雖然看起來是空的)
elif "data" in audio_info and audio_info["data"]:
# 這里的數(shù)據(jù)看起來是空的,但以防萬一還是處理
audio_data = audio_info["data"]
if audio_data and len(audio_data) > 10: # 確保不是空數(shù)據(jù)
try:
chunk_audio = base64.b64decode(audio_data)
all_audio_data.extend(chunk_audio)
logger.debug(f"解碼后音頻塊大小: {len(chunk_audio)}")
except Exception as e:
logger.debug(f"Base64 解碼失敗,可能是空數(shù)據(jù): {e}")
# 檢查錯誤
elif "code" in data and data["code"] != 200:
logger.error(f"Qwen TTS API 錯誤: {data}")
break
except json.JSONDecodeError as e:
logger.warning(f"JSON 解析錯誤: {e}")
continue
except Exception as e:
logger.error(f"處理 Qwen TTS 流時出錯: {e}")
continue
# 處理音頻數(shù)據(jù) - 優(yōu)先使用 URL 下載
if audio_url:
logger.info(f"從 URL 下載音頻: {audio_url}")
try:
# 下載音頻文件
audio_response = requests.get(audio_url, timeout=30)
if audio_response.status_code == 200:
audio_content = audio_response.content
logger.info(f"從 URL 下載音頻成功,大小: {len(audio_content)} 字節(jié)")
self.process_audio_data(audio_content, msg)
else:
logger.error(f"從 URL 下載音頻失敗: {audio_response.status_code}")
# 如果 URL 下載失敗,嘗試使用收集的 base64 數(shù)據(jù)
if all_audio_data:
logger.info(f"使用收集的音頻數(shù)據(jù),大小: {len(all_audio_data)} 字節(jié)")
self.process_audio_data(bytes(all_audio_data), msg)
except Exception as e:
logger.error(f"從 URL 下載音頻時出錯: {e}")
# 如果 URL 下載失敗,嘗試使用收集的 base64 數(shù)據(jù)
if all_audio_data:
logger.info(f"使用收集的音頻數(shù)據(jù),大小: {len(all_audio_data)} 字節(jié)")
self.process_audio_data(bytes(all_audio_data), msg)
elif all_audio_data:
logger.info(f"使用收集的音頻數(shù)據(jù),大小: {len(all_audio_data)} 字節(jié)")
self.process_audio_data(bytes(all_audio_data), msg)
else:
logger.warning("Qwen TTS 未收到音頻數(shù)據(jù)")
except requests.exceptions.RequestException as e:
logger.error(f"Qwen TTS 網(wǎng)絡(luò)請求錯誤: {e}")
except Exception as e:
logger.exception(f"Qwen TTS 未知錯誤: {e}")
def process_audio_data(self, audio_data, msg):
"""處理完整的音頻數(shù)據(jù)"""
text, textevent = msg
try:
# 生成臨時文件名,包含會話ID和時間戳
timestamp = int(time.time())
session_id = getattr(self.opt, ’sessionid’, 0)
temp_file = os.path.join(self.temp_dir, f"qwen_audio_{session_id}_{timestamp}.wav")
with open(temp_file, ’wb’) as f:
f.write(audio_data)
logger.info(f"音頻數(shù)據(jù)已保存到: {temp_file}")
# 使用 soundfile 讀取音頻
byte_stream = BytesIO(audio_data)
# 檢查文件格式
try:
stream, sample_rate = sf.read(byte_stream)
logger.info(f’Qwen TTS 音頻流 {sample_rate}: {stream.shape}’)
except Exception as e:
logger.error(f"無法讀取音頻文件: {e}")
# 可能是其他格式,嘗試使用其他方法
try:
# 重置字節(jié)流
byte_stream.seek(0)
# 嘗試使用 torchaudio
import torchaudio
stream, sample_rate = torchaudio.load(byte_stream)
stream = stream.numpy()[0] # 取第一個聲道
logger.info(f’使用 torchaudio 讀取音頻: {sample_rate}: {stream.shape}’)
except Exception as e2:
logger.error(f"torchaudio 也無法讀取: {e2}")
# 發(fā)送結(jié)束事件
end_event = {’type’: ’tts’, ’status’: ’end’, ’text’: text, ’msgevent’: textevent}
self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), end_event)
return
stream = stream.astype(np.float32)
if stream.ndim > 1:
logger.info(f’音頻有 {stream.shape[1]} 個聲道,只使用第一個’)
stream = stream[:, 0]
if sample_rate != self.sample_rate and stream.shape[0] > 0:
logger.info(f’音頻采樣率為 {sample_rate}, 重采樣為 {self.sample_rate}’)
stream = resampy.resample(x=stream, sr_orig=sample_rate, sr_new=self.sample_rate)
# 發(fā)送開始事件
start_event = {’type’: ’tts’, ’status’: ’start’, ’text’: text, ’msgevent’: textevent}
self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), start_event)
# 分割并發(fā)送音頻幀
streamlen = stream.shape[0]
idx = 0
while streamlen >= self.chunk and self.state == State.RUNNING:
self.parent.put_audio_frame(stream[idx:idx+self.chunk], None)
streamlen -= self.chunk
idx += self.chunk
# 處理剩余數(shù)據(jù)
if streamlen > 0 and self.state == State.RUNNING:
padding = np.zeros(self.chunk - streamlen, dtype=np.float32)
complete_chunk = np.concatenate((stream[idx:], padding))
self.parent.put_audio_frame(complete_chunk, None)
# 發(fā)送結(jié)束事件
end_event = {’type’: ’tts’, ’status’: ’end’, ’text’: text, ’msgevent’: textevent}
self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), end_event)
logger.info("Qwen TTS 音頻處理完成")
except Exception as e:
logger.error(f"處理 Qwen TTS 音頻數(shù)據(jù)時出錯: {e}")
# 確保發(fā)送結(jié)束事件
end_event = {’type’: ’tts’, ’status’: ’end’, ’text’: text, ’msgevent’: textevent}
self.parent.put_audio_frame(np.zeros(self.chunk, np.float32), end_event)
###########################################################################################