173 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			173 lines
		
	
	
		
			6.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import pyaudio
 | ||
| import wave
 | ||
| import tempfile
 | ||
| import os
 | ||
| import requests
 | ||
| import time
 | ||
| import sys
 | ||
| 
 | ||
| # 原代码3. 百度在线TTS模块完整逻辑
 | ||
| class BaiduOnlineTTS:
 | ||
|     def __init__(self, api_key, secret_key):
 | ||
|         """初始化百度在线TTS"""
 | ||
|         self.api_key = api_key
 | ||
|         self.secret_key = secret_key
 | ||
|         self.access_token = None
 | ||
|         self.token_expires = 0
 | ||
|         
 | ||
|         # 初始化音频播放器
 | ||
|         self.audio_player = pyaudio.PyAudio()
 | ||
|         
 | ||
|         # TTS配置参数
 | ||
|         self.default_options = {
 | ||
|             'vol': 5,    # 音量(0-15)
 | ||
|             'spd': 5,    # 语速(0-9)
 | ||
|             'pit': 5,    # 音调(0-9)
 | ||
|             'per': 0     # 发音人(0:女,1:男,3:情感女,4:情感男)
 | ||
|         }
 | ||
|         
 | ||
|         # 获取初始访问令牌
 | ||
|         if not self._get_access_token():
 | ||
|             raise Exception("无法获取百度API访问令牌,请检查密钥是否正确")
 | ||
| 
 | ||
|     def _get_access_token(self):
 | ||
|         """获取百度API访问令牌"""
 | ||
|         # 检查令牌是否仍然有效
 | ||
|         if self.access_token and time.time() < self.token_expires - 300:
 | ||
|             return True
 | ||
|             
 | ||
|         try:
 | ||
|             url = f"https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id={self.api_key}&client_secret={self.secret_key}"
 | ||
|             response = requests.get(url)
 | ||
|             result = response.json()
 | ||
|             
 | ||
|             if "access_token" in result:
 | ||
|                 self.access_token = result["access_token"]
 | ||
|                 self.token_expires = time.time() + result["expires_in"]
 | ||
|                 print("✅ 成功获取百度API访问令牌")
 | ||
|                 return True
 | ||
|             else:
 | ||
|                 print(f"❌ 获取令牌失败: {result}")
 | ||
|                 return False
 | ||
|         except Exception as e:
 | ||
|             print(f"❌ 获取令牌时发生错误: {str(e)}")
 | ||
|             return False
 | ||
| 
 | ||
|     def text_to_speech(self, text, options=None, save_path=None):
 | ||
|         """将文本转换为语音"""
 | ||
|         # 确保令牌有效
 | ||
|         if not self._get_access_token():
 | ||
|             return None
 | ||
|             
 | ||
|         # 合并配置参数
 | ||
|         params = self.default_options.copy()
 | ||
|         if options:
 | ||
|             params.update(options)
 | ||
|             
 | ||
|         try:
 | ||
|             # 对文本进行URL编码
 | ||
|             encoded_text = requests.utils.quote(text)
 | ||
|             url = f"https://tsn.baidu.com/text2audio?tex={encoded_text}&lan=zh&cuid=baidu-tts-python&ctp=1&tok={self.access_token}"
 | ||
|             
 | ||
|             # 添加合成参数
 | ||
|             for key, value in params.items():
 | ||
|                 url += f"&{key}={value}"
 | ||
|                 
 | ||
|             # 发送请求
 | ||
|             response = requests.get(url)
 | ||
|             
 | ||
|             # 检查响应是否为音频数据
 | ||
|             if response.headers.get("Content-Type", "").startswith("audio/"):
 | ||
|                 # 保存文件(如果需要)
 | ||
|                 if save_path:
 | ||
|                     with open(save_path, "wb") as f:
 | ||
|                         f.write(response.content)
 | ||
|                     print(f"✅ 音频已保存至: {save_path}")
 | ||
|                 
 | ||
|                 return response.content
 | ||
|             else:
 | ||
|                 # 解析错误信息
 | ||
|                 try:
 | ||
|                     error = response.json()
 | ||
|                     print(f"❌ 语音合成失败: {error.get('err_msg', '未知错误')}")
 | ||
|                 except:
 | ||
|                     print(f"❌ 语音合成失败,响应内容: {response.text}")
 | ||
|                 return None
 | ||
|                 
 | ||
|         except Exception as e:
 | ||
|             print(f"❌ 语音合成时发生错误: {str(e)}")
 | ||
|             return None
 | ||
| 
 | ||
|     def speak(self, text, options=None):
 | ||
|         """直接播放文本转换的语音"""
 | ||
|         # 全局变量由调度脚本传入,此处保留原逻辑调用
 | ||
|         from main_scheduler import feedback_playing
 | ||
|         if feedback_playing:
 | ||
|             return False
 | ||
|             
 | ||
|         feedback_playing = True
 | ||
|         
 | ||
|         # 限制文本长度(百度API有长度限制)
 | ||
|         if len(text) > 1024:
 | ||
|             print("⚠️ 文本过长,将截断为1024字符")
 | ||
|             text = text[:1024]
 | ||
|             
 | ||
|         # 获取音频数据
 | ||
|         audio_data = self.text_to_speech(text, options)
 | ||
|         if not audio_data:
 | ||
|             feedback_playing = False
 | ||
|             return False
 | ||
|             
 | ||
|         try:
 | ||
|             # 创建临时MP3文件
 | ||
|             with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_file:
 | ||
|                 temp_file.write(audio_data)
 | ||
|                 temp_filename = temp_file.name
 | ||
|             
 | ||
|             # 转换为WAV格式(适配pyaudio)
 | ||
|             from pydub import AudioSegment
 | ||
|             audio = AudioSegment.from_mp3(temp_filename)
 | ||
|             with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as wav_file:
 | ||
|                 audio.export(wav_file.name, format="wav")
 | ||
|                 wav_filename = wav_file.name
 | ||
|             
 | ||
|             # 播放WAV文件
 | ||
|             wf = wave.open(wav_filename, 'rb')
 | ||
|             stream = self.audio_player.open(
 | ||
|                 format=self.audio_player.get_format_from_width(wf.getsampwidth()),
 | ||
|                 channels=wf.getnchannels(),
 | ||
|                 rate=wf.getframerate(),
 | ||
|                 output=True
 | ||
|             )
 | ||
|             
 | ||
|             # 播放音频
 | ||
|             chunk = 1024
 | ||
|             data = wf.readframes(chunk)
 | ||
|             while data and feedback_playing:
 | ||
|                 stream.write(data)
 | ||
|                 data = wf.readframes(chunk)
 | ||
|             
 | ||
|             # 清理资源
 | ||
|             stream.stop_stream()
 | ||
|             stream.close()
 | ||
|             wf.close()
 | ||
|             
 | ||
|             print(f"✅ 语音播放完成: {text[:20]}...")
 | ||
|             return True
 | ||
|             
 | ||
|         except Exception as e:
 | ||
|             print(f"❌ 播放语音时发生错误: {str(e)}")
 | ||
|             return False
 | ||
|             
 | ||
|         finally:
 | ||
|             # 删除临时文件
 | ||
|             if 'temp_filename' in locals() and os.path.exists(temp_filename):
 | ||
|                 os.remove(temp_filename)
 | ||
|             if 'wav_filename' in locals() and os.path.exists(wav_filename):
 | ||
|                 os.remove(wav_filename)
 | ||
|             feedback_playing = False
 | ||
| 
 | ||
|     def close(self):
 | ||
|         """释放资源"""
 | ||
|         self.audio_player.terminate()
 | ||
|         print("✅ TTS资源已释放") |