| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583 |
- """
- TTS语音合成客户端模块 - 优化版
- 优化内容:
- 1. AudioStreamManager - 减少锁开销,使用bytearray缓冲,自动获取bytes_per_sample
- 2. ByteFIFO - 改进read方法,减少重复操作
- 3. WebSocket连接管理 - 使用Event替代轮询,改进音频缓存合并
- 4. 缓存播放 - 使用Event替代sleep,改进对齐计算
- 5. 异常处理 - 精确捕获异常类型,优化日志输出
- 6. 全局控制逻辑 - 使用Event替代硬等待,改进状态管理
- """
- import websocket
- import sounddevice as sd
- from time import mktime
- from wsgiref.handlers import format_date_time
- from urllib.parse import urlparse, urlencode
- import threading
- import time
- import json
- import hmac
- import hashlib
- import datetime
- import base64
- import queue
- import platform
- system = platform.system().lower()
- if system == "linux":
- sd.default.device = 'pulse'
- elif system == "windows":
- sd.default.device = None
- elif system == "darwin":
- sd.default.device = None
- import io
- from collections import deque
- from config.config.settings import config
- from utils.pc2_requests import _send_led_color_task
- from utils.tts_cache import get_cached_audio, save_audio_cache
- from utils.logger import logger
- # ==============================
- # 全局音频配置
- # ==============================
- UNIFIED_AUDIO_CONFIG = {
- 'samplerate': 16000,
- 'channels': 1,
- 'dtype': 'int16', # 16-bit PCM
- 'blocksize': 512, # 非 Windows
- 'latency': 'low',
- 'device': None,
- }
- WINDOWS_AUDIO_CONFIG = {
- 'samplerate': 16000,
- 'channels': 1,
- 'dtype': 'int16',
- 'blocksize': 1024, # Windows 稳定性更好
- 'latency': 'low',
- 'device': None,
- }
- # ==============================
- # 优化版字节 FIFO 缓冲(线程安全)
- # ==============================
- class ByteFIFO:
- def __init__(self):
- self._buf = deque()
- self._size = 0
- self._lock = threading.Lock()
- def write(self, data: bytes):
- if not data:
- return
- with self._lock:
- self._buf.append(data)
- self._size += len(data)
- def read(self, nbytes: int) -> bytes:
- """优化版read方法,减少重复操作"""
- if nbytes <= 0:
- return b''
-
- out = bytearray()
- with self._lock:
- while nbytes > 0 and self._buf:
- chunk = self._buf[0]
- take = min(len(chunk), nbytes)
- out.extend(chunk[:take])
- if take < len(chunk):
- self._buf[0] = chunk[take:]
- else:
- self._buf.popleft()
- self._size -= take
- nbytes -= take
- return bytes(out)
- def clear(self):
- with self._lock:
- self._buf.clear()
- self._size = 0
- def __len__(self):
- with self._lock:
- return self._size
- # ==============================
- # 优化版音频流管理器(单例 RawOutputStream + 回调)
- # ==============================
- class AudioStreamManager:
- def __init__(self):
- self._lock = threading.Lock()
- self._stream = None
- self._fifo = ByteFIFO()
- self._is_windows = platform.system().lower() == 'windows'
- self._config = WINDOWS_AUDIO_CONFIG if self._is_windows else UNIFIED_AUDIO_CONFIG
- # 自动获取每帧字节数,更通用
- try:
- # 创建临时流来获取dtype信息
- temp_stream = sd.RawOutputStream(
- samplerate=self._config['samplerate'],
- channels=self._config['channels'],
- dtype=self._config['dtype'],
- blocksize=1, # 最小块大小
- device=self._config['device']
- )
- self._bytes_per_sample = temp_stream.dtype.itemsize
- temp_stream.close()
- except Exception:
- # 回退到默认值
- self._bytes_per_sample = 2 # int16
-
- self._frame_bytes = self._config['channels'] * self._bytes_per_sample
- def get_audio_config(self):
- return self._config
- def _callback(self, outdata, frames, time_info, status):
- """优化版回调,使用memoryview直接填充,避免bytes拼接"""
- required = frames * self._frame_bytes
- data = self._fifo.read(required)
- if len(data) < required:
- # 使用memoryview直接填充零,避免bytes拼接
- outdata[:len(data)] = data
- outdata[len(data):] = b'\x00' * (required - len(data))
- else:
- outdata[:] = data
- def init_stream(self):
- with self._lock:
- if self._stream and self._stream.active:
- return self._stream
- cfg = self._config
- self._fifo.clear()
- self._stream = sd.RawOutputStream(
- samplerate=cfg['samplerate'],
- channels=cfg['channels'],
- dtype=cfg['dtype'],
- blocksize=cfg['blocksize'],
- latency=cfg['latency'],
- device=cfg['device'],
- callback=self._callback,
- )
- self._stream.start()
- logger.info(f"[音频] RawOutputStream 初始化成功: blocksize={cfg['blocksize']}, samplerate={cfg['samplerate']}")
- return self._stream
- def play_bytes(self, chunk: bytes):
- """喂入原始 PCM 字节(16kHz, 16bit, mono)"""
- if not chunk:
- return
- self._fifo.write(chunk)
- def clear_buffer(self):
- """清空音频缓冲区"""
- with self._lock:
- self._fifo.clear()
- logger.info("[音频] 缓冲区已清空")
- def stop_stream(self):
- with self._lock:
- if self._stream:
- try:
- self._stream.stop()
- self._stream.close()
- logger.info("[音频] 音频流已安全关闭")
- except Exception as e:
- logger.info(f"[音频] 关闭失败: {e}")
- finally:
- self._stream = None
- self._fifo.clear()
- def expected_chunk_size(self):
- """建议的投喂对齐字节数(一个 block 对应的字节量)"""
- return self._config['blocksize'] * self._frame_bytes
- _audio_manager = AudioStreamManager()
- # ==============================
- # 优化版 TTS 客户端
- # ==============================
- class AIUITTSClient:
- def __init__(self, text: str, use_cache: bool = True):
- if not text or not text.strip():
- raise ValueError("文本内容不能为空")
- self.text = text.strip()
- self.use_cache = use_cache
- self.call_time = time.time()
- self.start_play_time = None
- from config.config.settings import config
- self.handshake = self.assemble_auth_url(
- config.XUNFEI_STREAMING_TTS_URL)
- self.ws = None
- self._play_thread = None
- self._cache_play_thread = None # 缓存播放线程引用
-
- # 使用BytesIO替代列表,减少内存分配
- self._audio_buffer = io.BytesIO()
- self._interrupted = threading.Event()
- self._connection_established = threading.Event() # 使用Event替代bool
- self._audio_done_event = threading.Event() # 音频完成事件
-
- self._max_retries = 3 # 最大重试次数
- self._retry_count = 0 # 当前重试次数
- def is_connection_ready(self, ws):
- """检查WebSocket连接是否就绪"""
- try:
- return (ws and ws.sock and
- hasattr(ws.sock, 'connected') and
- ws.sock.connected and
- not self._interrupted.is_set())
- except Exception:
- return False
- def assemble_auth_url(self, base_url):
- from config.config.settings import config
- host = urlparse(base_url).netloc
- path = urlparse(base_url).path
- now = datetime.datetime.now()
- date = format_date_time(mktime(now.timetuple()))
- signature_origin = f"host: {host}\ndate: {date}\nGET {path} HTTP/1.1"
- signature_sha = hmac.new(config.XUNFEI_API_SECRET.encode(),
- signature_origin.encode(), digestmod=hashlib.sha256).digest()
- signature_base64 = base64.b64encode(signature_sha).decode()
- authorization_origin = (
- f'api_key="{config.XUNFEI_API_KEY}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_base64}"'
- )
- authorization = base64.b64encode(
- authorization_origin.encode()).decode()
- v = {"host": host, "date": date, "authorization": authorization}
- return base_url + "?" + urlencode(v)
- def send_text(self):
- """发送文本数据,优化连接检查"""
- if not self._is_ws_connected():
- raise Exception("WebSocket连接不可用")
-
- from config.config.settings import config
- aiui_data = {
- "header": {
- "sn": "yd-00:00:00:00:00:01",
- "appid": config.XUNFEI_APP_ID,
- "stmid": "text-1",
- "status": 3,
- "scene": 'IFLYTEK.hts'
- },
- "parameter": {
- "tts": {
- "vcn": config.TTS_VOICE,
- "tts": {
- "channels": 1,
- "sample_rate": 16000,
- "bit_depth": 16,
- "encoding": "raw"
- }
- }
- },
- "payload": {
- "text": {
- "compress": "raw",
- "format": "plain",
- "text": base64.b64encode(self.text.encode()).decode(),
- "encoding": "utf8",
- "status": 3
- }
- }
- }
- self.ws.send(json.dumps(aiui_data, ensure_ascii=False))
- def _is_ws_connected(self):
- """辅助函数:检查WebSocket连接状态"""
- try:
- return (self.ws and hasattr(self.ws, 'sock') and self.ws.sock and
- hasattr(self.ws.sock, 'connected') and self.ws.sock.connected)
- except Exception:
- return False
- def interrupt(self):
- """中断播放"""
- self._interrupted.set()
- logger.info("[中断] 开始中断播放")
-
- # 完全停止音频流,立即停止播放
- _audio_manager.stop_stream()
- logger.info("[TTS] 音频流已完全停止")
- # 关闭WebSocket连接
- if self.ws:
- try:
- # 标记连接为不可用
- self._connection_established.clear()
- # 关闭连接
- self.ws.close()
- logger.info("[中断] WebSocket连接已关闭")
- except Exception as e:
- logger.info(f"[中断] 关闭WebSocket失败: {e}")
- finally:
- # 确保WebSocket对象被清空
- self.ws = None
- logger.info("[中断] 播放已中断")
- # ---- WebSocket 回调 ----
- def on_open(self, ws):
- logger.info("[TTS] WebSocket连接已建立")
- self._connection_established.set()
- self._retry_count = 0 # 重置重试计数
- # 连接建立后立即发送数据
- try:
- # 检查连接是否仍然有效
- if self._is_ws_connected():
- self.send_text()
- logger.info("[TTS] 数据发送成功")
- else:
- logger.warning("[TTS] 连接已无效,无法发送数据")
- self._connection_established.clear()
- except Exception as e:
- logger.warning(f"[TTS] 发送数据失败: {e}")
- # 发送失败时,标记连接为不可用
- self._connection_established.clear()
- def on_message(self, ws, message):
- try:
- data = json.loads(message)
- if data.get("header", {}).get("code") != 0:
- logger.info(f"TTS错误: {data}")
- ws.close()
- return
- if self._interrupted.is_set():
- ws.close()
- return
- tts_audio_b64 = data.get("payload", {}).get("tts", {}).get("audio")
- if tts_audio_b64:
- try:
- # 16kHz/16bit/mono 裸PCM
- pcm = base64.b64decode(tts_audio_b64)
- self._audio_buffer.write(pcm) # 写入BytesIO
- _audio_manager.play_bytes(pcm) # 交给回调流
- except Exception as e:
- logger.info(f"[错误] 音频解码失败: {e}")
- # 检查是否完成
- if data.get("header", {}).get("status") == 2:
- logger.info("[TTS] 收到结束状态,准备关闭连接")
- # 收尾:缓存
- if self.use_cache and self._audio_buffer.tell() > 0:
- try:
- audio_data = self._audio_buffer.getvalue()
- save_audio_cache(self.text, audio_data)
- logger.info(f"[TTS] 缓存保存成功,总大小: {len(audio_data)} bytes")
- except Exception as e:
- logger.info(f"[TTS] 缓存保存失败: {e}")
- # 等待音频播放完成后再关闭连接
- self._wait_for_audio_completion()
- # 确保音频流停止
- if _audio_manager._stream and _audio_manager._stream.active:
- try:
- _audio_manager._stream.stop()
- logger.info("[TTS] 音频流已停止")
- except Exception as e:
- logger.info(f"[TTS] 停止音频流失败: {e}")
- # 关闭WebSocket连接
- ws.close()
- logger.info("[TTS] WebSocket连接已关闭")
- except json.JSONDecodeError as e:
- logger.info(f"[TTS] JSON解析错误: {e}")
- except Exception as e:
- logger.info(f"[TTS] 消息处理异常: {e}")
- import traceback
- traceback.print_exc()
- def _wait_for_audio_completion(self):
- """优化版等待音频播放完成,使用Event替代轮询"""
- try:
- if self._audio_buffer.tell() == 0:
- logger.info("[TTS] 无音频数据,无需等待")
- return
- # 计算总音频时长
- total_bytes = self._audio_buffer.tell()
- cfg = _audio_manager.get_audio_config()
- bytes_per_sec = cfg['samplerate'] * cfg['channels'] * 2
- audio_duration = total_bytes / max(1, bytes_per_sec)
- logger.info(f"[TTS] 音频总时长: {audio_duration:.2f}秒,等待播放完成...")
- # 使用Event等待,更高效
- self._audio_done_event.wait(timeout=audio_duration + 0.5)
- logger.info("[TTS] 音频播放完成")
- # 播放完成后,完全停止音频流,避免继续播放静音
- _audio_manager.stop_stream()
- logger.info("[TTS] 音频流已完全停止")
- except Exception as e:
- logger.info(f"[TTS] 等待音频完成时出错: {e}")
- def on_error(self, ws, error):
- # 改进错误处理:区分不同类型的错误
- error_str = str(error) if error else "未知错误"
- # 忽略一些常见的非致命错误
- if "already closed" in error_str.lower() or "connection is closed" in error_str.lower():
- logger.info(f"[TTS] 连接已关闭,忽略错误: {error_str}")
- return
- # 记录其他错误但不立即关闭连接
- logger.info(f"[TTS] 连接错误: {error_str}")
- # 只有在严重错误时才关闭连接
- if "timeout" in error_str.lower() or "connection refused" in error_str.lower():
- logger.info("[TTS] 严重连接错误,关闭连接")
- ws.close()
- def on_close(self, ws, code, reason):
- self._connection_established.clear()
- close_info = f"代码:{code}" if code else ""
- if reason:
- close_info += f", 原因:{reason}"
- logger.info(f"[TTS] 连接关闭 {close_info}")
- # 连接关闭时,完全停止音频流
- _audio_manager.stop_stream()
- logger.info("[TTS] 连接关闭时音频流已完全停止")
- # ---- 优化版缓存播放 ----
- def play_cached_audio(self, audio_data: bytes):
- logger.info(f"[缓存] 开始播放,大小={len(audio_data)} bytes")
- _audio_manager.init_stream()
- # 尽量按块对齐喂入
- align = _audio_manager.expected_chunk_size()
- if align <= 0:
- align = 2048
- for i in range(0, len(audio_data), align):
- # 检查是否被打断
- if self._interrupted.is_set():
- logger.info("[缓存] 播放被中断")
- return
- chunk = audio_data[i:i+align]
- _audio_manager.play_bytes(chunk)
- # 使用Event等待,更响应中断
- if self._interrupted.wait(0.01):
- logger.info("[缓存] 播放被中断")
- return
- # 播放完成后的等待,也要检查打断状态
- if not self._interrupted.is_set():
- cfg = _audio_manager.get_audio_config()
- bytes_per_sec = cfg['samplerate'] * cfg['channels'] * 2
- remaining_time = len(audio_data) / max(1, bytes_per_sec) + 0.05
- # 使用Event等待,更高效
- self._interrupted.wait(timeout=remaining_time)
- def start(self):
- # 先播缓存
- if self.use_cache:
- cached_audio = get_cached_audio(self.text)
- if cached_audio:
- logger.info("[缓存] 命中,使用缓存播放")
- self._cache_play_thread = threading.Thread(
- target=self.play_cached_audio, args=(cached_audio,), daemon=True)
- self._cache_play_thread.start()
- return # 确保缓存播放后直接返回,不执行API请求
- # 初始化音频流并请求合成
- _audio_manager.init_stream()
- logger.info(f"[API] 请求TTS: {self.text[:30]}...")
- # 确保每次都创建新的WebSocket连接
- self._create_new_connection()
- def _create_new_connection(self):
- """创建新的WebSocket连接"""
- logger.info("[TTS] 开始创建新的WebSocket连接")
-
- # 重置连接状态
- self._connection_established.clear()
- self._interrupted.clear()
- logger.info("[TTS] 连接状态已重置")
-
- # 创建新的WebSocket连接
- self.ws = websocket.WebSocketApp(
- self.handshake,
- on_open=self.on_open,
- on_message=self.on_message,
- on_error=self.on_error,
- on_close=self.on_close,
- )
- logger.info("[TTS] WebSocket对象已创建,准备启动连接线程")
-
- # 启动WebSocket线程
- self._play_thread = threading.Thread(
- target=lambda: self.ws.run_forever(), daemon=True)
- self._play_thread.start()
-
- logger.info("[TTS] WebSocket连接线程已启动")
- # ==============================
- # 优化版全局控制
- # ==============================
- _current_tts_client = None
- _lock = threading.Lock()
- _playback_done_event = threading.Event() # 播放完成事件
- def play_text_async(text: str, use_cache: bool = True):
- # 安全替换昵称
- machinename = getattr(config, 'machinename', None) or '小勇'
- text = text.replace('小勇', machinename)
- global _current_tts_client
- if not text or not isinstance(text, str) or text.strip() == "":
- logger.info("[错误] 文本内容不能为空")
- return
- with _lock:
- if _current_tts_client:
- logger.info("[停止] 打断旧播放")
- _current_tts_client.interrupt()
- _current_tts_client = None
- # 使用Event等待,更高效
- _playback_done_event.wait(timeout=0.3)
-
- # 创建新的TTS客户端
- tts_client = AIUITTSClient(text, use_cache)
- _current_tts_client = tts_client
-
- # 启动新的TTS客户端
- tts_client.start()
- def stop_playback():
- global _current_tts_client
- with _lock:
- if _current_tts_client:
- _current_tts_client.interrupt()
- _current_tts_client = None
- _audio_manager.stop_stream()
- def is_playing() -> bool:
- with _lock:
- return _current_tts_client is not None and not _current_tts_client._interrupted.is_set()
|