|
|
@@ -0,0 +1,1297 @@
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+自我声音检测和回声消除模块
|
|
|
+防止系统在播放TTS时误触发语音识别
|
|
|
+"""
|
|
|
+import threading
|
|
|
+import time
|
|
|
+import numpy as np
|
|
|
+from typing import Optional, Callable, Dict, List, Tuple
|
|
|
+from utils.logger import logger
|
|
|
+import hashlib
|
|
|
+import json
|
|
|
+from collections import deque
|
|
|
+from dataclasses import dataclass
|
|
|
+import struct
|
|
|
+from config.config.echo_cancellation_conf import EchoCancellationConf
|
|
|
+
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class AudioFingerprint:
|
|
|
+ """音频指纹数据结构"""
|
|
|
+ fingerprint: str
|
|
|
+ timestamp: float
|
|
|
+ duration: float
|
|
|
+ energy: float
|
|
|
+ spectral_features: List[float]
|
|
|
+
|
|
|
+
|
|
|
+class EchoCancellationEngine:
|
|
|
+ """回声消除引擎"""
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.is_enabled = EchoCancellationConf.ENABLE_ECHO_CANCELLATION
|
|
|
+ self.is_playing_tts = False
|
|
|
+ self._current_playing_status = False # 添加这个属性初始化
|
|
|
+ self.tts_audio_fingerprints: deque = deque(
|
|
|
+ maxlen=EchoCancellationConf.MAX_TTS_FINGERPRINTS)
|
|
|
+ self.recording_buffer: deque = deque(
|
|
|
+ maxlen=EchoCancellationConf.MAX_RECORDING_BUFFER)
|
|
|
+
|
|
|
+ # 配置参数
|
|
|
+ self.sample_rate = EchoCancellationConf.SAMPLE_RATE
|
|
|
+ self.frame_size = EchoCancellationConf.FRAME_SIZE
|
|
|
+ self.overlap_ratio = EchoCancellationConf.OVERLAP_RATIO
|
|
|
+ self.energy_threshold = EchoCancellationConf.ENERGY_THRESHOLD
|
|
|
+ self.correlation_threshold = EchoCancellationConf.CORRELATION_THRESHOLD
|
|
|
+ self.time_window = EchoCancellationConf.TIME_WINDOW
|
|
|
+
|
|
|
+ # 状态管理
|
|
|
+ self._lock = threading.RLock()
|
|
|
+ self.last_tts_end_time = 0
|
|
|
+ self.tts_fade_out_duration = EchoCancellationConf.TTS_FADE_OUT_DURATION
|
|
|
+
|
|
|
+ # 频谱分析参数
|
|
|
+ self.fft_size = EchoCancellationConf.FFT_SIZE
|
|
|
+ self.mel_filters = EchoCancellationConf.MEL_FILTERS
|
|
|
+
|
|
|
+ # 用户语音检测配置
|
|
|
+ self.voice_detection_config = EchoCancellationConf.get_voice_detection_config()
|
|
|
+
|
|
|
+ # TTS过滤配置
|
|
|
+ self.tts_filtering_config = EchoCancellationConf.TTS_FILTERING.copy()
|
|
|
+
|
|
|
+ # 统计信息
|
|
|
+ self.stats = {
|
|
|
+ 'total_processed': 0,
|
|
|
+ 'echo_detected': 0,
|
|
|
+ 'false_positives': 0,
|
|
|
+ 'processing_time_avg': 0.0,
|
|
|
+ 'user_interrupts_detected': 0
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.info("🔇 回声消除引擎已初始化")
|
|
|
+
|
|
|
+ # 验证配置
|
|
|
+ config_errors = EchoCancellationConf.validate_config()
|
|
|
+ if config_errors:
|
|
|
+ logger.warning(f"⚠️ 配置验证发现问题: {config_errors}")
|
|
|
+ else:
|
|
|
+ logger.info("✅ 回声消除配置验证通过")
|
|
|
+
|
|
|
+ def set_tts_playing_status(self, is_playing: bool, audio_data: Optional[bytes] = None):
|
|
|
+ """设置TTS播放状态并记录音频指纹"""
|
|
|
+ with self._lock:
|
|
|
+ # 避免重复设置相同状态
|
|
|
+ if hasattr(self, '_current_playing_status') and self._current_playing_status == is_playing:
|
|
|
+ return
|
|
|
+
|
|
|
+ self._current_playing_status = is_playing
|
|
|
+ self.is_playing_tts = is_playing
|
|
|
+ current_time = time.time()
|
|
|
+
|
|
|
+ if is_playing:
|
|
|
+ # 记录TTS开始时间
|
|
|
+ self._tts_start_time = current_time
|
|
|
+
|
|
|
+ if audio_data:
|
|
|
+ # 生成TTS音频指纹
|
|
|
+ fingerprint = self._generate_audio_fingerprint(
|
|
|
+ audio_data, current_time)
|
|
|
+ if fingerprint:
|
|
|
+ self.tts_audio_fingerprints.append(fingerprint)
|
|
|
+ logger.debug(
|
|
|
+ f"🎵 记录TTS音频指纹: {fingerprint.fingerprint[:16]}...")
|
|
|
+ else:
|
|
|
+ self.last_tts_end_time = current_time
|
|
|
+ # logger.debug("🔇 TTS播放结束,开始淡出期")
|
|
|
+
|
|
|
+ def is_echo_audio(self, audio_data: bytes) -> bool:
|
|
|
+ """检测音频是否为回声(自我声音)"""
|
|
|
+ if not self.is_enabled or not audio_data:
|
|
|
+ return False
|
|
|
+
|
|
|
+ start_time = time.time()
|
|
|
+
|
|
|
+ try:
|
|
|
+ with self._lock:
|
|
|
+ current_time = time.time()
|
|
|
+
|
|
|
+ # 如果正在播放TTS,采用更严格的过滤策略
|
|
|
+ if self.is_playing_tts:
|
|
|
+ # 检查TTS播放是否刚刚开始,给音频指纹建立一些时间
|
|
|
+ tts_start_time = getattr(self, '_tts_start_time', 0)
|
|
|
+ if current_time - tts_start_time < 1.2: # TTS开始1.2秒内,延长过滤时间
|
|
|
+ # 直接认为是回声,给系统时间建立指纹,防止误触发打断
|
|
|
+ self.stats['echo_detected'] += 1
|
|
|
+ # logger.debug("🚨 TTS刚开始播放,预防性过滤音频")
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 生成当前音频的指纹进行分析
|
|
|
+ current_fingerprint = self._generate_audio_fingerprint(
|
|
|
+ audio_data, current_time)
|
|
|
+ if not current_fingerprint:
|
|
|
+ # 无法生成指纹时默认认为是回声
|
|
|
+ self.stats['echo_detected'] += 1
|
|
|
+ # logger.debug("🚨 无法生成音频指纹,默认过滤")
|
|
|
+ return True
|
|
|
+
|
|
|
+ # TTS播放期间,采用更严格的过滤策略
|
|
|
+ # 只有非常明显的用户语音才允许通过
|
|
|
+ if self._is_very_strong_user_voice(current_fingerprint):
|
|
|
+ # 进一步检查与TTS的差异,使用更严格的标准
|
|
|
+ if self._has_extreme_difference_from_tts(current_fingerprint):
|
|
|
+ # logger.debug("🎤 检测到非常强烈的用户语音特征且与TTS有极大差异,允许通过")
|
|
|
+ return False
|
|
|
+ else:
|
|
|
+ # 即使能量很强,但与TTS差异不够大,仍然过滤
|
|
|
+ self.stats['echo_detected'] += 1
|
|
|
+ logger.debug("🚨 强用户语音但与TTS差异不够,仍然过滤")
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 其他情况全部过滤,防止回声
|
|
|
+ self.stats['echo_detected'] += 1
|
|
|
+ # logger.debug("🚨 TTS播放期间严格过滤音频,防止回声")
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 检查是否在TTS结束后的淡出期内
|
|
|
+ if current_time - self.last_tts_end_time < self.tts_fade_out_duration:
|
|
|
+ # 在TTS淡出期内,采用更宽松的过滤策略
|
|
|
+ # 生成当前音频的指纹进行分析
|
|
|
+ current_fingerprint = self._generate_audio_fingerprint(
|
|
|
+ audio_data, current_time)
|
|
|
+ if not current_fingerprint:
|
|
|
+ # 无法生成指纹时默认过滤
|
|
|
+ self.stats['echo_detected'] += 1
|
|
|
+ # logger.debug("🚨 TTS淡出期内无法生成音频指纹,默认过滤")
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 在淡出期内,允许明显的用户语音通过
|
|
|
+ if self._has_obvious_voice_characteristics(current_fingerprint):
|
|
|
+ # 检查与TTS的差异,但使用更宽松的标准
|
|
|
+ if self._has_basic_difference_from_tts(current_fingerprint):
|
|
|
+ # logger.debug("🎤 TTS淡出期内检测到明显用户语音,允许通过")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 其他情况仍然过滤,但记录更详细的信息
|
|
|
+ self.stats['echo_detected'] += 1
|
|
|
+ logger.debug(
|
|
|
+ f"🚨 TTS淡出期内过滤音频(距离TTS结束 {current_time - self.last_tts_end_time:.1f}s)")
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 生成当前音频的指纹
|
|
|
+ current_fingerprint = self._generate_audio_fingerprint(
|
|
|
+ audio_data, current_time)
|
|
|
+ if not current_fingerprint:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 与最近的TTS指纹进行比较
|
|
|
+ is_echo = self._compare_with_tts_fingerprints(
|
|
|
+ current_fingerprint)
|
|
|
+
|
|
|
+ if is_echo:
|
|
|
+ self.stats['echo_detected'] += 1
|
|
|
+
|
|
|
+ # 更新统计信息
|
|
|
+ self.stats['total_processed'] += 1
|
|
|
+
|
|
|
+ processing_time = time.time() - start_time
|
|
|
+ self.stats['processing_time_avg'] = (
|
|
|
+ self.stats['processing_time_avg'] * (self.stats['total_processed'] - 1) +
|
|
|
+ processing_time
|
|
|
+ ) / self.stats['total_processed']
|
|
|
+
|
|
|
+ return is_echo
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 回声检测失败: {e}")
|
|
|
+ # 出错时默认不认为是回声,避免影响正常功能
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _generate_audio_fingerprint(self, audio_data: bytes, timestamp: float) -> Optional[AudioFingerprint]:
|
|
|
+ """生成音频指纹"""
|
|
|
+ try:
|
|
|
+ # 转换为numpy数组
|
|
|
+ samples = np.frombuffer(
|
|
|
+ audio_data, dtype=np.int16).astype(np.float32)
|
|
|
+ if len(samples) == 0:
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 计算能量
|
|
|
+ energy = float(np.mean(samples ** 2))
|
|
|
+ if energy < self.energy_threshold:
|
|
|
+ return None
|
|
|
+
|
|
|
+ # 计算频谱特征
|
|
|
+ spectral_features = self._extract_spectral_features(samples)
|
|
|
+
|
|
|
+ # 生成更详细的指纹哈希 - 包含更多特征
|
|
|
+ feature_str = f"{energy:.2f}_{len(samples)}_{np.mean(spectral_features):.4f}_{np.std(spectral_features):.4f}"
|
|
|
+ fingerprint = hashlib.md5(feature_str.encode()).hexdigest()
|
|
|
+
|
|
|
+ duration = len(samples) / self.sample_rate
|
|
|
+
|
|
|
+ # 记录更详细的调试信息
|
|
|
+ if EchoCancellationConf.should_log_audio_fingerprints():
|
|
|
+ logger.debug(
|
|
|
+ f"🎵 生成音频指纹: 能量={energy:.1f}, 时长={duration:.3f}s, 特征均值={np.mean(spectral_features):.3f}")
|
|
|
+
|
|
|
+ return AudioFingerprint(
|
|
|
+ fingerprint=fingerprint,
|
|
|
+ timestamp=timestamp,
|
|
|
+ duration=duration,
|
|
|
+ energy=energy,
|
|
|
+ spectral_features=spectral_features
|
|
|
+ )
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 生成音频指纹失败: {e}")
|
|
|
+ return None
|
|
|
+
|
|
|
+ def _extract_spectral_features(self, samples: np.ndarray) -> List[float]:
|
|
|
+ """提取频谱特征"""
|
|
|
+ try:
|
|
|
+ # 确保样本长度足够进行FFT
|
|
|
+ if len(samples) < self.fft_size:
|
|
|
+ # 零填充
|
|
|
+ padded_samples = np.zeros(self.fft_size)
|
|
|
+ padded_samples[:len(samples)] = samples
|
|
|
+ samples = padded_samples
|
|
|
+
|
|
|
+ # 应用窗函数
|
|
|
+ windowed = samples[:self.fft_size] * np.hanning(self.fft_size)
|
|
|
+
|
|
|
+ # FFT变换
|
|
|
+ fft_result = np.fft.fft(windowed)
|
|
|
+ magnitude_spectrum = np.abs(fft_result[:self.fft_size//2])
|
|
|
+
|
|
|
+ # 计算mel频率特征
|
|
|
+ mel_features = self._compute_mel_features(magnitude_spectrum)
|
|
|
+
|
|
|
+ return mel_features.tolist()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 提取频谱特征失败: {e}")
|
|
|
+ return [0.0] * self.mel_filters
|
|
|
+
|
|
|
+ def _compute_mel_features(self, magnitude_spectrum: np.ndarray) -> np.ndarray:
|
|
|
+ """计算Mel频率特征"""
|
|
|
+ try:
|
|
|
+ # 简化的Mel滤波器组
|
|
|
+ mel_filters = np.linspace(
|
|
|
+ 0, len(magnitude_spectrum), self.mel_filters + 2)
|
|
|
+ mel_features = np.zeros(self.mel_filters)
|
|
|
+
|
|
|
+ for i in range(self.mel_filters):
|
|
|
+ start_idx = int(mel_filters[i])
|
|
|
+ end_idx = int(mel_filters[i + 2])
|
|
|
+ if end_idx > start_idx:
|
|
|
+ mel_features[i] = np.mean(
|
|
|
+ magnitude_spectrum[start_idx:end_idx])
|
|
|
+
|
|
|
+ # 对数变换
|
|
|
+ mel_features = np.log(mel_features + 1e-10)
|
|
|
+
|
|
|
+ return mel_features
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 计算Mel特征失败: {e}")
|
|
|
+ return np.zeros(self.mel_filters)
|
|
|
+
|
|
|
+ def _compare_with_tts_fingerprints(self, current_fingerprint: AudioFingerprint) -> bool:
|
|
|
+ """与TTS指纹进行比较"""
|
|
|
+ try:
|
|
|
+ current_time = current_fingerprint.timestamp
|
|
|
+
|
|
|
+ for tts_fingerprint in self.tts_audio_fingerprints:
|
|
|
+ # 检查时间窗口
|
|
|
+ time_diff = current_time - tts_fingerprint.timestamp
|
|
|
+ if time_diff > self.time_window:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 比较指纹哈希
|
|
|
+ if current_fingerprint.fingerprint == tts_fingerprint.fingerprint:
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 比较能量和频谱特征
|
|
|
+ if self._is_similar_audio(current_fingerprint, tts_fingerprint):
|
|
|
+ return True
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 指纹比较失败: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _is_similar_audio(self, fp1: AudioFingerprint, fp2: AudioFingerprint) -> bool:
|
|
|
+ """判断两个音频指纹是否相似"""
|
|
|
+ try:
|
|
|
+ # 能量相似性检查
|
|
|
+ energy_ratio = min(fp1.energy, fp2.energy) / \
|
|
|
+ max(fp1.energy, fp2.energy)
|
|
|
+ if energy_ratio < 0.5:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 频谱特征相似性检查
|
|
|
+ if len(fp1.spectral_features) != len(fp2.spectral_features):
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 计算余弦相似度
|
|
|
+ features1 = np.array(fp1.spectral_features)
|
|
|
+ features2 = np.array(fp2.spectral_features)
|
|
|
+
|
|
|
+ norm1 = np.linalg.norm(features1)
|
|
|
+ norm2 = np.linalg.norm(features2)
|
|
|
+
|
|
|
+ if norm1 == 0 or norm2 == 0:
|
|
|
+ return False
|
|
|
+
|
|
|
+ cosine_similarity = np.dot(features1, features2) / (norm1 * norm2)
|
|
|
+
|
|
|
+ return cosine_similarity > self.correlation_threshold
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 音频相似性计算失败: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _is_likely_user_voice(self, fingerprint: AudioFingerprint) -> bool:
|
|
|
+ """判断是否可能是用户语音(用于打断检测)"""
|
|
|
+ try:
|
|
|
+ # 1. 能量阈值检查 - 用户语音通常有足够的能量
|
|
|
+ user_voice_threshold = self.energy_threshold * \
|
|
|
+ self.voice_detection_config['energy_multiplier']
|
|
|
+ if fingerprint.energy < user_voice_threshold:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 2. 频谱特征分析 - 人声有特定的频谱特征
|
|
|
+ features = np.array(fingerprint.spectral_features)
|
|
|
+
|
|
|
+ # 检查频谱分布是否符合人声特征
|
|
|
+ # 人声通常在中低频有较强的能量
|
|
|
+ if len(features) >= 8:
|
|
|
+ low_freq_energy = np.mean(features[:4]) # 低频部分
|
|
|
+ mid_freq_energy = np.mean(features[4:8]) # 中频部分
|
|
|
+ high_freq_energy = np.mean(features[8:]) if len(
|
|
|
+ features) > 8 else 0 # 高频部分
|
|
|
+
|
|
|
+ # 人声特征:中低频能量较强,高频相对较弱
|
|
|
+ low_freq_weight = self.voice_detection_config['low_freq_weight']
|
|
|
+ if mid_freq_energy > low_freq_energy * low_freq_weight and mid_freq_energy > high_freq_energy:
|
|
|
+ # 3. 与最近TTS音频的差异检查
|
|
|
+ if self._has_significant_difference_from_tts(fingerprint):
|
|
|
+ self.stats['user_interrupts_detected'] += 1
|
|
|
+ return True
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 用户语音判断失败: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _has_significant_difference_from_tts(self, fingerprint: AudioFingerprint) -> bool:
|
|
|
+ """检查与TTS音频是否有显著差异"""
|
|
|
+ try:
|
|
|
+ if not self.tts_audio_fingerprints:
|
|
|
+ return True # 没有TTS参考,认为是用户语音
|
|
|
+
|
|
|
+ current_time = fingerprint.timestamp
|
|
|
+ tts_reference_window = EchoCancellationConf.TTS_REFERENCE_WINDOW
|
|
|
+
|
|
|
+ # 找到最近的TTS指纹进行比较
|
|
|
+ recent_tts_fingerprints = [
|
|
|
+ fp for fp in self.tts_audio_fingerprints
|
|
|
+ if current_time - fp.timestamp < tts_reference_window
|
|
|
+ ]
|
|
|
+
|
|
|
+ if not recent_tts_fingerprints:
|
|
|
+ return True # 没有最近的TTS参考
|
|
|
+
|
|
|
+ # 与最近的TTS指纹比较
|
|
|
+ check_count = self.voice_detection_config['recent_tts_check_count']
|
|
|
+ energy_diff_threshold = self.voice_detection_config['energy_diff_threshold']
|
|
|
+ spectral_diff_threshold = self.voice_detection_config['spectral_diff_threshold']
|
|
|
+
|
|
|
+ for tts_fp in recent_tts_fingerprints[-check_count:]:
|
|
|
+ # 能量差异检查
|
|
|
+ energy_diff = abs(fingerprint.energy - tts_fp.energy) / \
|
|
|
+ max(fingerprint.energy, tts_fp.energy)
|
|
|
+ if energy_diff > energy_diff_threshold:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 频谱特征差异检查
|
|
|
+ if len(fingerprint.spectral_features) == len(tts_fp.spectral_features):
|
|
|
+ features1 = np.array(fingerprint.spectral_features)
|
|
|
+ features2 = np.array(tts_fp.spectral_features)
|
|
|
+
|
|
|
+ # 计算频谱差异
|
|
|
+ spectral_diff = np.mean(np.abs(features1 - features2))
|
|
|
+ if spectral_diff < spectral_diff_threshold:
|
|
|
+ return False # 频谱过于相似,可能是回声
|
|
|
+
|
|
|
+ return True # 与TTS有显著差异,可能是用户语音
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ TTS差异检查失败: {e}")
|
|
|
+ return True # 出错时倾向于认为是用户语音
|
|
|
+
|
|
|
+ def _update_processing_time(self, processing_time: float):
|
|
|
+ """更新处理时间统计"""
|
|
|
+ if self.stats['total_processed'] > 0:
|
|
|
+ alpha = 0.1 # 平滑因子
|
|
|
+ self.stats['processing_time_avg'] = (
|
|
|
+ alpha * processing_time +
|
|
|
+ (1 - alpha) * self.stats['processing_time_avg']
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ self.stats['processing_time_avg'] = processing_time
|
|
|
+
|
|
|
+ def cleanup_old_fingerprints(self):
|
|
|
+ """清理过期的指纹"""
|
|
|
+ try:
|
|
|
+ with self._lock:
|
|
|
+ current_time = time.time()
|
|
|
+
|
|
|
+ # 清理过期的TTS指纹
|
|
|
+ while (self.tts_audio_fingerprints and
|
|
|
+ current_time - self.tts_audio_fingerprints[0].timestamp > self.time_window):
|
|
|
+ self.tts_audio_fingerprints.popleft()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 清理指纹失败: {e}")
|
|
|
+
|
|
|
+ def get_stats(self) -> Dict:
|
|
|
+ """获取统计信息"""
|
|
|
+ with self._lock:
|
|
|
+ total = self.stats['total_processed']
|
|
|
+ return {
|
|
|
+ 'total_processed': total,
|
|
|
+ 'echo_detected': self.stats['echo_detected'],
|
|
|
+ 'user_interrupts_detected': self.stats['user_interrupts_detected'],
|
|
|
+ 'echo_detection_rate': self.stats['echo_detected'] / max(total, 1),
|
|
|
+ 'interrupt_success_rate': self.stats['user_interrupts_detected'] / max(total, 1),
|
|
|
+ 'processing_time_avg_ms': self.stats['processing_time_avg'] * 1000,
|
|
|
+ 'fingerprints_stored': len(self.tts_audio_fingerprints),
|
|
|
+ 'config': {
|
|
|
+ 'is_enabled': self.is_enabled,
|
|
|
+ 'interrupt_during_playback': EchoCancellationConf.ENABLE_INTERRUPT_DURING_PLAYBACK,
|
|
|
+ 'energy_threshold': self.energy_threshold,
|
|
|
+ 'user_voice_threshold': self.energy_threshold * self.voice_detection_config['energy_multiplier']
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ def enable(self):
|
|
|
+ """启用回声消除"""
|
|
|
+ self.is_enabled = True
|
|
|
+ logger.info("✅ 回声消除已启用")
|
|
|
+
|
|
|
+ def disable(self):
|
|
|
+ """禁用回声消除"""
|
|
|
+ self.is_enabled = False
|
|
|
+ logger.info("❌ 回声消除已禁用")
|
|
|
+
|
|
|
+ def _is_tts_variant_audio(self, fingerprint: AudioFingerprint) -> bool:
|
|
|
+ """检测是否为TTS音频的变种(经过扬声器-麦克风传输后的音频)"""
|
|
|
+ try:
|
|
|
+ if not self.tts_audio_fingerprints:
|
|
|
+ return False
|
|
|
+
|
|
|
+ current_time = fingerprint.timestamp
|
|
|
+ detection_window = self.tts_filtering_config['variant_detection_window']
|
|
|
+
|
|
|
+ # 检查最近的TTS指纹
|
|
|
+ recent_tts_fingerprints = [
|
|
|
+ fp for fp in self.tts_audio_fingerprints
|
|
|
+ if current_time - fp.timestamp < detection_window
|
|
|
+ ]
|
|
|
+
|
|
|
+ if not recent_tts_fingerprints:
|
|
|
+ return False
|
|
|
+
|
|
|
+ energy_range = self.tts_filtering_config['energy_attenuation_range']
|
|
|
+ similarity_threshold = self.tts_filtering_config['variant_similarity_threshold']
|
|
|
+ correlation_threshold = self.tts_filtering_config['frequency_correlation_threshold']
|
|
|
+
|
|
|
+ # 检查所有最近的TTS指纹,而不仅仅是最后几个
|
|
|
+ for tts_fp in recent_tts_fingerprints:
|
|
|
+ # 1. 时间相关性检查 - 播放开始后短时间内的音频很可能是回声
|
|
|
+ time_diff = current_time - tts_fp.timestamp
|
|
|
+ if time_diff < 1.0: # 1秒内
|
|
|
+
|
|
|
+ # 2. 能量衰减检查 - 扬声器播放的音频通过麦克风录制会有能量衰减
|
|
|
+ if tts_fp.energy > 0: # 避免除零
|
|
|
+ energy_ratio = fingerprint.energy / tts_fp.energy
|
|
|
+ if energy_range[0] <= energy_ratio <= energy_range[1]:
|
|
|
+
|
|
|
+ # 3. 频谱形状相似性检查
|
|
|
+ if len(fingerprint.spectral_features) == len(tts_fp.spectral_features):
|
|
|
+ features1 = np.array(
|
|
|
+ fingerprint.spectral_features)
|
|
|
+ features2 = np.array(tts_fp.spectral_features)
|
|
|
+
|
|
|
+ # 归一化频谱特征以消除能量差异的影响
|
|
|
+ if np.linalg.norm(features1) > 0 and np.linalg.norm(features2) > 0:
|
|
|
+ features1_norm = features1 / \
|
|
|
+ np.linalg.norm(features1)
|
|
|
+ features2_norm = features2 / \
|
|
|
+ np.linalg.norm(features2)
|
|
|
+
|
|
|
+ # 计算归一化后的相似度
|
|
|
+ similarity = np.dot(
|
|
|
+ features1_norm, features2_norm)
|
|
|
+
|
|
|
+ # 如果归一化后的相似度高,很可能是TTS音频的变种
|
|
|
+ if similarity > similarity_threshold:
|
|
|
+ if EchoCancellationConf.should_log_detection_details():
|
|
|
+ logger.debug(
|
|
|
+ f"🚨 TTS变种检测: 时间差={time_diff:.3f}s, 能量比={energy_ratio:.3f}, 相似度={similarity:.3f}")
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 4. 频率分布相关性检查
|
|
|
+ if self._has_similar_frequency_distribution(fingerprint, tts_fp, correlation_threshold):
|
|
|
+ if EchoCancellationConf.should_log_detection_details():
|
|
|
+ logger.debug(f"🚨 TTS变种检测: 频率分布相似,时间差={time_diff:.3f}s")
|
|
|
+ return True
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ TTS变种检测失败: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _has_similar_frequency_distribution(self, fp1: AudioFingerprint, fp2: AudioFingerprint, threshold: float = 0.5) -> bool:
|
|
|
+ """检查两个音频指纹是否有相似的频率分布"""
|
|
|
+ try:
|
|
|
+ if len(fp1.spectral_features) != len(fp2.spectral_features):
|
|
|
+ return False
|
|
|
+
|
|
|
+ features1 = np.array(fp1.spectral_features)
|
|
|
+ features2 = np.array(fp2.spectral_features)
|
|
|
+
|
|
|
+ # 计算频率分布的相关系数
|
|
|
+ if len(features1) > 1:
|
|
|
+ correlation = np.corrcoef(features1, features2)[0, 1]
|
|
|
+ return not np.isnan(correlation) and correlation > threshold
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 频率分布比较失败: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _is_definitely_user_voice(self, fingerprint: AudioFingerprint) -> bool:
|
|
|
+ """严格判断是否为确定的用户语音(用于播放中的打断检测)"""
|
|
|
+ try:
|
|
|
+ # 1. 更高的能量阈值 - 用户打断时通常会更大声
|
|
|
+ energy_multiplier = self.tts_filtering_config['definite_voice_energy_multiplier']
|
|
|
+ high_energy_threshold = self.energy_threshold * \
|
|
|
+ self.voice_detection_config['energy_multiplier'] * \
|
|
|
+ energy_multiplier
|
|
|
+ if fingerprint.energy < high_energy_threshold:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 2. 严格的频谱特征分析
|
|
|
+ features = np.array(fingerprint.spectral_features)
|
|
|
+
|
|
|
+ if len(features) >= 8:
|
|
|
+ low_freq_energy = np.mean(features[:4]) # 低频部分
|
|
|
+ mid_freq_energy = np.mean(features[4:8]) # 中频部分
|
|
|
+ high_freq_energy = np.mean(features[8:]) if len(
|
|
|
+ features) > 8 else 0 # 高频部分
|
|
|
+
|
|
|
+ # 人声特征检查 - 更严格的标准
|
|
|
+ # 中频能量应该明显高于低频和高频
|
|
|
+ if not (mid_freq_energy > low_freq_energy * 0.8 and
|
|
|
+ mid_freq_energy > high_freq_energy * 1.2):
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 3. 检查频谱的动态范围 - 人声通常有较大的动态范围
|
|
|
+ min_spectral_range = self.tts_filtering_config['min_spectral_range']
|
|
|
+ spectral_range = np.max(features) - np.min(features)
|
|
|
+ if spectral_range < min_spectral_range:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 4. 与所有TTS音频的差异检查 - 必须与所有TTS音频都有显著差异
|
|
|
+ if not self._has_significant_difference_from_all_tts(fingerprint):
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 5. 检查音频的复杂度 - 人声通常比TTS更复杂
|
|
|
+ if not self._has_sufficient_complexity(fingerprint):
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 所有检查都通过,认为是确定的用户语音
|
|
|
+ self.stats['user_interrupts_detected'] += 1
|
|
|
+ return True
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 确定用户语音判断失败: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _has_significant_difference_from_all_tts(self, fingerprint: AudioFingerprint) -> bool:
|
|
|
+ """检查与所有TTS音频是否都有显著差异"""
|
|
|
+ try:
|
|
|
+ if not self.tts_audio_fingerprints:
|
|
|
+ return True
|
|
|
+
|
|
|
+ current_time = fingerprint.timestamp
|
|
|
+ max_similarity = self.tts_filtering_config['max_similarity_with_tts']
|
|
|
+
|
|
|
+ # 检查所有最近的TTS指纹
|
|
|
+ recent_tts_fingerprints = [
|
|
|
+ fp for fp in self.tts_audio_fingerprints
|
|
|
+ if current_time - fp.timestamp < 3.0 # 3秒内的所有TTS
|
|
|
+ ]
|
|
|
+
|
|
|
+ if not recent_tts_fingerprints:
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 必须与所有TTS音频都有显著差异
|
|
|
+ for tts_fp in recent_tts_fingerprints:
|
|
|
+ # 能量差异检查 - 更严格
|
|
|
+ energy_ratio = min(fingerprint.energy, tts_fp.energy) / \
|
|
|
+ max(fingerprint.energy, tts_fp.energy)
|
|
|
+ if energy_ratio > 0.7: # 能量过于相似
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 频谱相似度检查 - 更严格
|
|
|
+ if len(fingerprint.spectral_features) == len(tts_fp.spectral_features):
|
|
|
+ features1 = np.array(fingerprint.spectral_features)
|
|
|
+ features2 = np.array(tts_fp.spectral_features)
|
|
|
+
|
|
|
+ if np.linalg.norm(features1) > 0 and np.linalg.norm(features2) > 0:
|
|
|
+ # 归一化比较
|
|
|
+ features1_norm = features1 / np.linalg.norm(features1)
|
|
|
+ features2_norm = features2 / np.linalg.norm(features2)
|
|
|
+ similarity = np.dot(features1_norm, features2_norm)
|
|
|
+
|
|
|
+ if similarity > max_similarity: # 相似度过高
|
|
|
+ return False
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 全TTS差异检查失败: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _has_sufficient_complexity(self, fingerprint: AudioFingerprint) -> bool:
|
|
|
+ """检查音频是否有足够的复杂度(人声特征)"""
|
|
|
+ try:
|
|
|
+ features = np.array(fingerprint.spectral_features)
|
|
|
+ min_variation = self.tts_filtering_config['min_spectral_variation']
|
|
|
+
|
|
|
+ # 1. 频谱变化检查 - 人声通常有更多的频谱变化
|
|
|
+ if len(features) > 1:
|
|
|
+ spectral_variation = np.std(features)
|
|
|
+ if spectral_variation < min_variation:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 2. 频谱分布检查 - 人声应该有特定的频率分布
|
|
|
+ if len(features) >= 6:
|
|
|
+ # 检查是否有明显的共振峰特征
|
|
|
+ # 人声通常在某些频段有能量集中
|
|
|
+ max_energy_idx = np.argmax(features)
|
|
|
+ if max_energy_idx < 2 or max_energy_idx > len(features) - 2:
|
|
|
+ # 能量峰值在边缘,可能不是人声
|
|
|
+ return False
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 复杂度检查失败: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _is_likely_user_voice_relaxed(self, fingerprint: AudioFingerprint) -> bool:
|
|
|
+ """宽松判断是否为用户语音(用于非严格模式的打断检测)"""
|
|
|
+ try:
|
|
|
+ # 1. 极严格的能量阈值 - 避免嘈杂环境误判
|
|
|
+ user_voice_threshold = self.energy_threshold * 5.0 # 大幅提高阈值
|
|
|
+ if fingerprint.energy < user_voice_threshold:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 2. 如果音频能量足够,需要极严格的检查
|
|
|
+ moderate_energy_threshold = self.energy_threshold * 20.0 # 极大幅提高阈值
|
|
|
+ if fingerprint.energy > moderate_energy_threshold:
|
|
|
+ # 能量足够时,需要通过更严格的差异检查
|
|
|
+ if self._has_strict_difference_from_tts(fingerprint):
|
|
|
+ logger.debug(
|
|
|
+ f"🎤 高能量({fingerprint.energy:.1f}),通过严格差异检查,认为是用户语音")
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ logger.debug(
|
|
|
+ f"🚫 高能量({fingerprint.energy:.1f}),但未通过严格差异检查,可能是噪音")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 3. 基本的频谱特征检查(更宽松)
|
|
|
+ features = np.array(fingerprint.spectral_features)
|
|
|
+
|
|
|
+ if len(features) >= 4: # 降低要求
|
|
|
+ # 检查是否有人声的基本特征
|
|
|
+ if len(features) >= 6:
|
|
|
+ mid_freq_energy = np.mean(features[2:5]) # 中频部分
|
|
|
+ total_energy = np.mean(features)
|
|
|
+
|
|
|
+ # 中频能量占比检查(更宽松)
|
|
|
+ if mid_freq_energy > total_energy * 0.3: # 从0.5降低到0.3
|
|
|
+ # 简单的与TTS差异检查
|
|
|
+ if self._has_basic_difference_from_tts(fingerprint):
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 4. 如果音频足够大声,需要极严格的检查
|
|
|
+ high_energy_threshold = self.energy_threshold * 50.0 # 提高到50.0,极严格
|
|
|
+ if fingerprint.energy > high_energy_threshold:
|
|
|
+ # 即使能量很高,也要检查与TTS的差异,并且需要更严格的条件
|
|
|
+ if self._has_strict_difference_from_tts(fingerprint):
|
|
|
+ logger.debug(
|
|
|
+ f"🎤 极高能量({fingerprint.energy:.1f})且通过严格差异检查,认为是用户语音")
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ logger.debug(
|
|
|
+ f"🚫 极高能量({fingerprint.energy:.1f}),但未通过严格差异检查,可能是强噪音")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 5. 时间窗口检查 - 距离TTS较远时更容易认为是用户语音
|
|
|
+ if self.tts_audio_fingerprints:
|
|
|
+ last_tts_time = max(
|
|
|
+ fp.timestamp for fp in self.tts_audio_fingerprints)
|
|
|
+ time_since_last_tts = fingerprint.timestamp - last_tts_time
|
|
|
+ if time_since_last_tts > 1.5: # 从1.0提高到1.5秒
|
|
|
+ logger.debug(
|
|
|
+ f"🎤 距离最后TTS较远({time_since_last_tts:.1f}s),认为是用户语音")
|
|
|
+ return True
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 宽松用户语音判断失败: {e}")
|
|
|
+ # 出错时倾向于认为是用户语音,允许打断
|
|
|
+ return True
|
|
|
+
|
|
|
+ def _has_basic_difference_from_tts(self, fingerprint: AudioFingerprint) -> bool:
|
|
|
+ """基本的与TTS差异检查(更宽松)"""
|
|
|
+ try:
|
|
|
+ if not self.tts_audio_fingerprints:
|
|
|
+ return True # 没有TTS参考,认为是用户语音
|
|
|
+
|
|
|
+ current_time = fingerprint.timestamp
|
|
|
+
|
|
|
+ # 检查是否在TTS播放期间
|
|
|
+ if self.is_playing_tts:
|
|
|
+ # TTS播放期间采用更严格的标准
|
|
|
+ # 只检查最近0.3秒内的TTS,缩短时间窗口
|
|
|
+ recent_tts_fingerprints = [
|
|
|
+ fp for fp in self.tts_audio_fingerprints
|
|
|
+ if current_time - fp.timestamp < 0.3
|
|
|
+ ]
|
|
|
+
|
|
|
+ if not recent_tts_fingerprints:
|
|
|
+ return True # 没有最近的TTS参考
|
|
|
+
|
|
|
+ # 与最近的TTS指纹比较(TTS播放期间更严格的标准)
|
|
|
+ for tts_fp in recent_tts_fingerprints[-1:]: # 只检查最近1个
|
|
|
+ # 能量差异检查(TTS播放期间更严格)
|
|
|
+ energy_ratio = min(fingerprint.energy, tts_fp.energy) / \
|
|
|
+ max(fingerprint.energy, tts_fp.energy)
|
|
|
+ if energy_ratio > 0.6: # TTS播放期间需要更大的能量差异
|
|
|
+ logger.debug(
|
|
|
+ f"🚨 TTS播放期间能量过于相似({energy_ratio:.2f}),可能是回声")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 频谱特征差异检查(TTS播放期间更严格)
|
|
|
+ if len(fingerprint.spectral_features) == len(tts_fp.spectral_features):
|
|
|
+ features1 = np.array(fingerprint.spectral_features)
|
|
|
+ features2 = np.array(tts_fp.spectral_features)
|
|
|
+
|
|
|
+ # 计算简单的欧氏距离
|
|
|
+ distance = np.linalg.norm(features1 - features2)
|
|
|
+ if distance < 0.8: # TTS播放期间需要更大的频谱差异
|
|
|
+ logger.debug(
|
|
|
+ f"🚨 TTS播放期间频谱距离过小({distance:.2f}),可能是回声")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 检查相关性
|
|
|
+ if np.linalg.norm(features1) > 0 and np.linalg.norm(features2) > 0:
|
|
|
+ features1_norm = features1 / \
|
|
|
+ np.linalg.norm(features1)
|
|
|
+ features2_norm = features2 / \
|
|
|
+ np.linalg.norm(features2)
|
|
|
+ correlation = np.dot(
|
|
|
+ features1_norm, features2_norm)
|
|
|
+ if correlation > 0.5: # TTS播放期间相关性要求更严格
|
|
|
+ logger.debug(
|
|
|
+ f"🚨 TTS播放期间频谱相关性过高({correlation:.2f}),可能是回声")
|
|
|
+ return False
|
|
|
+
|
|
|
+ return True # 通过TTS播放期间的严格检查
|
|
|
+ else:
|
|
|
+ # 非TTS播放期间采用原来的宽松标准
|
|
|
+ # 只检查最近0.5秒内的TTS,缩短时间窗口
|
|
|
+ recent_tts_fingerprints = [
|
|
|
+ fp for fp in self.tts_audio_fingerprints
|
|
|
+ if current_time - fp.timestamp < 0.5
|
|
|
+ ]
|
|
|
+
|
|
|
+ if not recent_tts_fingerprints:
|
|
|
+ return True # 没有最近的TTS参考
|
|
|
+
|
|
|
+ # 与最近的TTS指纹比较(非常宽松的标准)
|
|
|
+ for tts_fp in recent_tts_fingerprints[-1:]: # 只检查最近1个
|
|
|
+ # 能量差异检查(更宽松)
|
|
|
+ energy_ratio = min(fingerprint.energy, tts_fp.energy) / \
|
|
|
+ max(fingerprint.energy, tts_fp.energy)
|
|
|
+ if energy_ratio > 0.8: # 从0.5提高到0.8,需要更相似才认为可疑
|
|
|
+
|
|
|
+ # 频谱特征差异检查(更宽松)
|
|
|
+ if len(fingerprint.spectral_features) == len(tts_fp.spectral_features):
|
|
|
+ features1 = np.array(fingerprint.spectral_features)
|
|
|
+ features2 = np.array(tts_fp.spectral_features)
|
|
|
+
|
|
|
+ # 计算简单的欧氏距离
|
|
|
+ distance = np.linalg.norm(features1 - features2)
|
|
|
+ if distance < 0.5: # 从1.0降低到0.5,需要更相似才认为是回声
|
|
|
+ logger.debug(f"🚨 频谱距离过小({distance:.2f}),可能是回声")
|
|
|
+ return False
|
|
|
+
|
|
|
+ return True # 通过基本检查,认为是用户语音
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 基本TTS差异检查失败: {e}")
|
|
|
+ return True # 出错时倾向于认为是用户语音
|
|
|
+
|
|
|
+ def _has_strict_difference_from_tts(self, fingerprint: AudioFingerprint) -> bool:
|
|
|
+ """严格的与TTS差异检查(用于高能量音频)"""
|
|
|
+ try:
|
|
|
+ if not self.tts_audio_fingerprints:
|
|
|
+ return True # 没有TTS参考,认为是用户语音
|
|
|
+
|
|
|
+ current_time = fingerprint.timestamp
|
|
|
+
|
|
|
+ # 检查最近1秒内的TTS,时间窗口更严格
|
|
|
+ recent_tts_fingerprints = [
|
|
|
+ fp for fp in self.tts_audio_fingerprints
|
|
|
+ if current_time - fp.timestamp < 1.0
|
|
|
+ ]
|
|
|
+
|
|
|
+ if not recent_tts_fingerprints:
|
|
|
+ return True # 没有最近的TTS参考
|
|
|
+
|
|
|
+ # 与最近的TTS指纹比较(非常严格的标准)
|
|
|
+ for tts_fp in recent_tts_fingerprints:
|
|
|
+ # 1. 时间相关性检查 - 如果在TTS播放后很短时间内出现,更可能是回声
|
|
|
+ time_diff = current_time - tts_fp.timestamp
|
|
|
+ if time_diff < 0.3: # 300ms内
|
|
|
+ logger.debug(f"🚫 时间过近({time_diff:.2f}s),可能是回声")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 2. 能量差异检查(非常严格)
|
|
|
+ energy_ratio = min(fingerprint.energy, tts_fp.energy) / \
|
|
|
+ max(fingerprint.energy, tts_fp.energy)
|
|
|
+ if energy_ratio > 0.9: # 需要能量差异很大才认为不是回声
|
|
|
+ logger.debug(f"🚫 能量过于相似({energy_ratio:.2f}),可能是回声")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 3. 频谱特征差异检查(非常严格)
|
|
|
+ if len(fingerprint.spectral_features) == len(tts_fp.spectral_features):
|
|
|
+ features1 = np.array(fingerprint.spectral_features)
|
|
|
+ features2 = np.array(tts_fp.spectral_features)
|
|
|
+
|
|
|
+ # 计算欧氏距离和相关性
|
|
|
+ distance = np.linalg.norm(features1 - features2)
|
|
|
+ if distance < 0.3: # 需要频谱差异很大
|
|
|
+ logger.debug(f"🚫 频谱过于相似({distance:.2f}),可能是回声")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 检查相关性
|
|
|
+ if np.linalg.norm(features1) > 0 and np.linalg.norm(features2) > 0:
|
|
|
+ features1_norm = features1 / np.linalg.norm(features1)
|
|
|
+ features2_norm = features2 / np.linalg.norm(features2)
|
|
|
+ correlation = np.dot(features1_norm, features2_norm)
|
|
|
+ if correlation > 0.7: # 相关性过高
|
|
|
+ logger.debug(f"🚫 频谱相关性过高({correlation:.2f}),可能是回声")
|
|
|
+ return False
|
|
|
+
|
|
|
+ return True # 通过严格检查,认为是用户语音
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 严格TTS差异检查失败: {e}")
|
|
|
+ return False # 出错时倾向于认为是回声,减少误判
|
|
|
+
|
|
|
+ def _is_definitely_echo(self, fingerprint: AudioFingerprint) -> bool:
|
|
|
+ """确定判断是否为回声(用于非严格模式)"""
|
|
|
+ try:
|
|
|
+ if not self.tts_audio_fingerprints:
|
|
|
+ return False # 没有TTS参考,不能确定是回声
|
|
|
+
|
|
|
+ current_time = fingerprint.timestamp
|
|
|
+
|
|
|
+ # 检查最近的TTS指纹
|
|
|
+ recent_tts_fingerprints = [
|
|
|
+ fp for fp in self.tts_audio_fingerprints
|
|
|
+ if current_time - fp.timestamp < 1.5 # 1.5秒内的TTS
|
|
|
+ ]
|
|
|
+
|
|
|
+ if not recent_tts_fingerprints:
|
|
|
+ return False # 没有最近的TTS参考
|
|
|
+
|
|
|
+ for tts_fp in recent_tts_fingerprints:
|
|
|
+ # 1. 时间相关性检查 - 如果在TTS播放后很短时间内出现
|
|
|
+ time_diff = current_time - tts_fp.timestamp
|
|
|
+ if time_diff < 0.3: # 300ms内
|
|
|
+
|
|
|
+ # 2. 能量衰减检查 - 符合扬声器到麦克风的衰减特征
|
|
|
+ energy_ratio = fingerprint.energy / tts_fp.energy
|
|
|
+ if 0.1 <= energy_ratio <= 0.7: # 能量衰减10%-70%
|
|
|
+
|
|
|
+ # 3. 频谱相似性检查 - 高度相似
|
|
|
+ if len(fingerprint.spectral_features) == len(tts_fp.spectral_features):
|
|
|
+ features1 = np.array(fingerprint.spectral_features)
|
|
|
+ features2 = np.array(tts_fp.spectral_features)
|
|
|
+
|
|
|
+ if np.linalg.norm(features1) > 0 and np.linalg.norm(features2) > 0:
|
|
|
+ # 归一化比较
|
|
|
+ features1_norm = features1 / \
|
|
|
+ np.linalg.norm(features1)
|
|
|
+ features2_norm = features2 / \
|
|
|
+ np.linalg.norm(features2)
|
|
|
+ similarity = np.dot(
|
|
|
+ features1_norm, features2_norm)
|
|
|
+
|
|
|
+ # 如果相似度很高,几乎确定是回声
|
|
|
+ if similarity > 0.8:
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 4. 完全匹配检查
|
|
|
+ if fingerprint.fingerprint == tts_fp.fingerprint:
|
|
|
+ return True
|
|
|
+
|
|
|
+ return False # 不能确定是回声
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 确定回声判断失败: {e}")
|
|
|
+ return False # 出错时不确定是回声
|
|
|
+
|
|
|
+ def _trigger_interrupt(self):
|
|
|
+ """已移除:不再触发打断信号,语音正常播放进行消音"""
|
|
|
+ try:
|
|
|
+ # 防止重复触发
|
|
|
+ current_time = time.time()
|
|
|
+ if hasattr(self, '_last_trigger_time') and current_time - self._last_trigger_time < 0.5:
|
|
|
+ logger.debug("🔇 打断信号防抖:跳过重复触发")
|
|
|
+ return
|
|
|
+
|
|
|
+ self._last_trigger_time = current_time
|
|
|
+
|
|
|
+ # 不再创建待验证的打断请求,只进行消音处理
|
|
|
+ logger.debug("🔇 回声消除检测到音频,进行消音处理但不打断播放")
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 消音处理失败: {e}")
|
|
|
+ import traceback
|
|
|
+ logger.error(f"消音处理异常详情: {traceback.format_exc()}")
|
|
|
+
|
|
|
+ def _is_very_strong_user_voice(self, fingerprint: AudioFingerprint) -> bool:
|
|
|
+ """
|
|
|
+ 检测是否为非常强的用户语音
|
|
|
+ 用于在TTS播放期间让明显的用户语音通过,触发VAD和IAT
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 1. 适中的能量阈值 - 让更多用户语音通过
|
|
|
+ high_energy_threshold = self.energy_threshold * 3.0 # 降低到3倍基础阈值
|
|
|
+ if fingerprint.energy < high_energy_threshold:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 2. 检查是否有明显的人声特征
|
|
|
+ if not self._has_obvious_voice_characteristics(fingerprint):
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 3. 与TTS音频有足够差异
|
|
|
+ if not self._has_significant_difference_from_tts(fingerprint):
|
|
|
+ return False
|
|
|
+
|
|
|
+ # logger.debug(f"🔊 检测到强用户语音,能量: {fingerprint.energy:.1f}")
|
|
|
+ return True
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 强用户语音检测失败: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _has_extreme_difference_from_tts(self, fingerprint: AudioFingerprint) -> bool:
|
|
|
+ """检查与TTS音频是否有极大差异"""
|
|
|
+ try:
|
|
|
+ if not self.tts_audio_fingerprints:
|
|
|
+ return True
|
|
|
+
|
|
|
+ current_time = fingerprint.timestamp
|
|
|
+
|
|
|
+ # 检查最近的TTS指纹
|
|
|
+ recent_tts_fingerprints = [
|
|
|
+ fp for fp in self.tts_audio_fingerprints
|
|
|
+ if current_time - fp.timestamp < 3.0 # 3秒内的TTS,延长检查时间
|
|
|
+ ]
|
|
|
+
|
|
|
+ if not recent_tts_fingerprints:
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 必须与所有TTS音频都有极大的能量差异
|
|
|
+ for tts_fp in recent_tts_fingerprints:
|
|
|
+ energy_ratio = min(fingerprint.energy, tts_fp.energy) / \
|
|
|
+ max(fingerprint.energy, tts_fp.energy)
|
|
|
+ if energy_ratio > 0.3: # 能量差异阈值降低到0.3,更严格
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 进一步检查频谱特征差异
|
|
|
+ for tts_fp in recent_tts_fingerprints:
|
|
|
+ if self._has_similar_frequency_distribution(fingerprint, tts_fp, threshold=0.6):
|
|
|
+ return False
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 极大TTS差异检查失败: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _has_obvious_voice_characteristics(self, fingerprint: AudioFingerprint) -> bool:
|
|
|
+ """检查是否有明显的人声特征"""
|
|
|
+ try:
|
|
|
+ features = np.array(fingerprint.spectral_features)
|
|
|
+
|
|
|
+ if len(features) < 6:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 简单但有效的人声检查
|
|
|
+ low_freq = np.mean(features[:2]) # 低频
|
|
|
+ mid_freq = np.mean(features[2:5]) # 中频
|
|
|
+ high_freq = np.mean(features[5:]) # 高频
|
|
|
+
|
|
|
+ # 放宽人声检测条件
|
|
|
+ # 1. 中频能量较强(主要条件)
|
|
|
+ if mid_freq > low_freq * 0.6 and mid_freq > high_freq * 0.6:
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 2. 或者能量分布相对均匀(人声特征)
|
|
|
+ total_energy = low_freq + mid_freq + high_freq
|
|
|
+ if total_energy > 0:
|
|
|
+ mid_ratio = mid_freq / total_energy
|
|
|
+ if 0.25 < mid_ratio < 0.6: # 中频占比合理范围
|
|
|
+ return True
|
|
|
+
|
|
|
+ return False
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 明显人声特征检测失败: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 已移除:_is_very_likely_user_voice_during_tts 方法(不再需要TTS期间的打断检测)
|
|
|
+
|
|
|
+ def _has_strong_voice_characteristics(self, fingerprint: AudioFingerprint) -> bool:
|
|
|
+ """检查是否具有强烈的人声特征"""
|
|
|
+ try:
|
|
|
+ features = np.array(fingerprint.spectral_features)
|
|
|
+
|
|
|
+ if len(features) < 8:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 更严格的频谱分布检查
|
|
|
+ low_freq = np.mean(features[:2]) # 低频
|
|
|
+ mid_freq = np.mean(features[2:6]) # 中频
|
|
|
+ high_freq = np.mean(features[6:]) # 高频
|
|
|
+
|
|
|
+ # 中频能量必须明显强于低频和高频(人声特征)
|
|
|
+ if mid_freq < low_freq * 1.2 or mid_freq < high_freq * 1.5:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 检查频谱的平滑度 - 人声通常有特定的共振峰
|
|
|
+ spectral_variance = np.var(features)
|
|
|
+ if spectral_variance < 0.1: # 频谱过于平滑可能是TTS
|
|
|
+ return False
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 强人声特征检测失败: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _has_high_spectral_complexity(self, fingerprint: AudioFingerprint) -> bool:
|
|
|
+ """检查是否具有高频谱复杂度"""
|
|
|
+ try:
|
|
|
+ features = np.array(fingerprint.spectral_features)
|
|
|
+
|
|
|
+ if len(features) < 4:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 计算频谱的熵(复杂度指标)
|
|
|
+ normalized_features = features / \
|
|
|
+ np.sum(features) if np.sum(features) > 0 else features
|
|
|
+ entropy = -np.sum(normalized_features *
|
|
|
+ np.log(normalized_features + 1e-10))
|
|
|
+
|
|
|
+ # 人声通常有较高的熵值
|
|
|
+ min_entropy = 2.0 # 根据实际情况调整
|
|
|
+ if entropy < min_entropy:
|
|
|
+ return False
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 频谱复杂度检测失败: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _is_possibly_user_voice_during_tts(self, fingerprint: AudioFingerprint) -> bool:
|
|
|
+ """
|
|
|
+ 在TTS播放期间判断是否可能是用户语音
|
|
|
+ 使用更宽松的标准,主要用于创建待验证的打断请求
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 1. 基本能量检查 - 需要足够的能量
|
|
|
+ min_energy_threshold = self.energy_threshold * 2.0 # 降低能量要求
|
|
|
+ if fingerprint.energy < min_energy_threshold:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 2. 与TTS音频的基本差异检查
|
|
|
+ if not self._has_basic_difference_from_tts(fingerprint):
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 3. 检查是否有人声特征
|
|
|
+ if not self._has_voice_characteristics(fingerprint):
|
|
|
+ return False
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ TTS期间用户语音检测失败: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ def _has_voice_characteristics(self, fingerprint: AudioFingerprint) -> bool:
|
|
|
+ """检查音频是否具有人声特征"""
|
|
|
+ try:
|
|
|
+ features = np.array(fingerprint.spectral_features)
|
|
|
+
|
|
|
+ if len(features) < 4:
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 检查频谱分布 - 人声通常在中频有较强能量
|
|
|
+ if len(features) >= 8:
|
|
|
+ low_freq = np.mean(features[:3])
|
|
|
+ mid_freq = np.mean(features[3:7])
|
|
|
+ high_freq = np.mean(features[7:]) if len(features) > 7 else 0
|
|
|
+
|
|
|
+ # 中频能量应该相对较强
|
|
|
+ if mid_freq < low_freq * 0.5:
|
|
|
+ return False
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 人声特征检测失败: {e}")
|
|
|
+ return False
|
|
|
+
|
|
|
+ # 已移除:_create_pending_interrupt_request 方法(不再需要创建待验证的打断请求)
|
|
|
+
|
|
|
+
|
|
|
+class SelfVoiceDetector:
|
|
|
+ """自我声音检测器"""
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ self.echo_engine = EchoCancellationEngine()
|
|
|
+ self.voice_callbacks: Dict[str, Callable] = {}
|
|
|
+ self._lock = threading.RLock()
|
|
|
+
|
|
|
+ # 定期清理线程
|
|
|
+ self._cleanup_thread = threading.Thread(
|
|
|
+ target=self._periodic_cleanup, daemon=True)
|
|
|
+ self._stop_cleanup = threading.Event()
|
|
|
+ self._cleanup_thread.start()
|
|
|
+
|
|
|
+ logger.info("🎯 自我声音检测器已初始化")
|
|
|
+
|
|
|
+ def register_voice_callback(self, name: str, callback: Callable):
|
|
|
+ """注册声音检测回调"""
|
|
|
+ with self._lock:
|
|
|
+ self.voice_callbacks[name] = callback
|
|
|
+ logger.info(f"📝 注册声音检测回调: {name}")
|
|
|
+
|
|
|
+ def unregister_voice_callback(self, name: str):
|
|
|
+ """注销声音检测回调"""
|
|
|
+ with self._lock:
|
|
|
+ if name in self.voice_callbacks:
|
|
|
+ del self.voice_callbacks[name]
|
|
|
+ logger.info(f"🗑️ 注销声音检测回调: {name}")
|
|
|
+
|
|
|
+ def set_tts_playing(self, is_playing: bool, audio_data: Optional[bytes] = None):
|
|
|
+ """设置TTS播放状态"""
|
|
|
+ self.echo_engine.set_tts_playing_status(is_playing, audio_data)
|
|
|
+
|
|
|
+ # 通知回调
|
|
|
+ with self._lock:
|
|
|
+ for name, callback in self.voice_callbacks.items():
|
|
|
+ try:
|
|
|
+ callback('tts_status_changed', {
|
|
|
+ 'is_playing': is_playing,
|
|
|
+ 'timestamp': time.time()
|
|
|
+ })
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 声音检测回调 {name} 执行失败: {e}")
|
|
|
+
|
|
|
+ def should_ignore_audio(self, audio_data: bytes) -> bool:
|
|
|
+ """判断是否应该忽略音频(回声检测)"""
|
|
|
+ return self.echo_engine.is_echo_audio(audio_data)
|
|
|
+
|
|
|
+ def process_recording_audio(self, audio_data: bytes) -> bool:
|
|
|
+ """处理录音音频,返回是否应该继续处理"""
|
|
|
+ if self.should_ignore_audio(audio_data):
|
|
|
+ # logger.debug("🚫 检测到自我声音,忽略音频数据")
|
|
|
+ return False
|
|
|
+ return True
|
|
|
+
|
|
|
+ def _periodic_cleanup(self):
|
|
|
+ """定期清理过期数据"""
|
|
|
+ while not self._stop_cleanup.is_set():
|
|
|
+ try:
|
|
|
+ self.echo_engine.cleanup_old_fingerprints()
|
|
|
+ time.sleep(0.1) # 每秒清理一次
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"❌ 定期清理失败: {e}")
|
|
|
+ time.sleep(1.0) # 出错时等待更长时间
|
|
|
+
|
|
|
+ def get_detection_stats(self) -> Dict:
|
|
|
+ """获取检测统计信息"""
|
|
|
+ return self.echo_engine.get_stats()
|
|
|
+
|
|
|
+ def enable_echo_cancellation(self):
|
|
|
+ """启用回声消除"""
|
|
|
+ self.echo_engine.enable()
|
|
|
+
|
|
|
+ def disable_echo_cancellation(self):
|
|
|
+ """禁用回声消除"""
|
|
|
+ self.echo_engine.disable()
|
|
|
+
|
|
|
+ def shutdown(self):
|
|
|
+ """关闭检测器"""
|
|
|
+ logger.info("🔄 关闭自我声音检测器...")
|
|
|
+ self._stop_cleanup.set()
|
|
|
+ if self._cleanup_thread.is_alive():
|
|
|
+ self._cleanup_thread.join(timeout=2.0)
|
|
|
+ logger.info("✅ 自我声音检测器已关闭")
|
|
|
+
|
|
|
+
|
|
|
+# 全局实例
|
|
|
+_self_voice_detector: Optional[SelfVoiceDetector] = None
|
|
|
+_detector_lock = threading.Lock()
|
|
|
+
|
|
|
+
|
|
|
+def get_self_voice_detector() -> SelfVoiceDetector:
|
|
|
+ """获取全局自我声音检测器实例"""
|
|
|
+ global _self_voice_detector
|
|
|
+
|
|
|
+ with _detector_lock:
|
|
|
+ if _self_voice_detector is None:
|
|
|
+ _self_voice_detector = SelfVoiceDetector()
|
|
|
+ return _self_voice_detector
|
|
|
+
|
|
|
+
|
|
|
+def cleanup_self_voice_detector():
|
|
|
+ """清理全局检测器实例"""
|
|
|
+ global _self_voice_detector
|
|
|
+
|
|
|
+ with _detector_lock:
|
|
|
+ if _self_voice_detector is not None:
|
|
|
+ _self_voice_detector.shutdown()
|
|
|
+ _self_voice_detector = None
|