| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297 |
- # -*- coding: utf-8 -*-
- """
- 自我声音检测和回声消除模块
- 防止系统在播放TTS时误触发语音识别
- """
- import threading
- import time
- import numpy as np
- from typing import Optional, Callable, Dict, List, Tuple
- from utils.logger import logger
- import hashlib
- import json
- from collections import deque
- from dataclasses import dataclass
- import struct
- from config.config.echo_cancellation_conf import EchoCancellationConf
- @dataclass
- class AudioFingerprint:
- """音频指纹数据结构"""
- fingerprint: str
- timestamp: float
- duration: float
- energy: float
- spectral_features: List[float]
- class EchoCancellationEngine:
- """回声消除引擎"""
- def __init__(self):
- self.is_enabled = EchoCancellationConf.ENABLE_ECHO_CANCELLATION
- self.is_playing_tts = False
- self._current_playing_status = False # 添加这个属性初始化
- self.tts_audio_fingerprints: deque = deque(
- maxlen=EchoCancellationConf.MAX_TTS_FINGERPRINTS)
- self.recording_buffer: deque = deque(
- maxlen=EchoCancellationConf.MAX_RECORDING_BUFFER)
- # 配置参数
- self.sample_rate = EchoCancellationConf.SAMPLE_RATE
- self.frame_size = EchoCancellationConf.FRAME_SIZE
- self.overlap_ratio = EchoCancellationConf.OVERLAP_RATIO
- self.energy_threshold = EchoCancellationConf.ENERGY_THRESHOLD
- self.correlation_threshold = EchoCancellationConf.CORRELATION_THRESHOLD
- self.time_window = EchoCancellationConf.TIME_WINDOW
- # 状态管理
- self._lock = threading.RLock()
- self.last_tts_end_time = 0
- self.tts_fade_out_duration = EchoCancellationConf.TTS_FADE_OUT_DURATION
- # 频谱分析参数
- self.fft_size = EchoCancellationConf.FFT_SIZE
- self.mel_filters = EchoCancellationConf.MEL_FILTERS
- # 用户语音检测配置
- self.voice_detection_config = EchoCancellationConf.get_voice_detection_config()
- # TTS过滤配置
- self.tts_filtering_config = EchoCancellationConf.TTS_FILTERING.copy()
- # 统计信息
- self.stats = {
- 'total_processed': 0,
- 'echo_detected': 0,
- 'false_positives': 0,
- 'processing_time_avg': 0.0,
- 'user_interrupts_detected': 0
- }
- logger.info("🔇 回声消除引擎已初始化")
- # 验证配置
- config_errors = EchoCancellationConf.validate_config()
- if config_errors:
- logger.warning(f"⚠️ 配置验证发现问题: {config_errors}")
- else:
- logger.info("✅ 回声消除配置验证通过")
- def set_tts_playing_status(self, is_playing: bool, audio_data: Optional[bytes] = None):
- """设置TTS播放状态并记录音频指纹"""
- with self._lock:
- # 避免重复设置相同状态
- if hasattr(self, '_current_playing_status') and self._current_playing_status == is_playing:
- return
- self._current_playing_status = is_playing
- self.is_playing_tts = is_playing
- current_time = time.time()
- if is_playing:
- # 记录TTS开始时间
- self._tts_start_time = current_time
- if audio_data:
- # 生成TTS音频指纹
- fingerprint = self._generate_audio_fingerprint(
- audio_data, current_time)
- if fingerprint:
- self.tts_audio_fingerprints.append(fingerprint)
- logger.debug(
- f"🎵 记录TTS音频指纹: {fingerprint.fingerprint[:16]}...")
- else:
- self.last_tts_end_time = current_time
- # logger.debug("🔇 TTS播放结束,开始淡出期")
- def is_echo_audio(self, audio_data: bytes) -> bool:
- """检测音频是否为回声(自我声音)"""
- if not self.is_enabled or not audio_data:
- return False
- start_time = time.time()
- try:
- with self._lock:
- current_time = time.time()
- # 如果正在播放TTS,采用更严格的过滤策略
- if self.is_playing_tts:
- # 检查TTS播放是否刚刚开始,给音频指纹建立一些时间
- tts_start_time = getattr(self, '_tts_start_time', 0)
- if current_time - tts_start_time < 1.2: # TTS开始1.2秒内,延长过滤时间
- # 直接认为是回声,给系统时间建立指纹,防止误触发打断
- self.stats['echo_detected'] += 1
- # logger.debug("🚨 TTS刚开始播放,预防性过滤音频")
- return True
- # 生成当前音频的指纹进行分析
- current_fingerprint = self._generate_audio_fingerprint(
- audio_data, current_time)
- if not current_fingerprint:
- # 无法生成指纹时默认认为是回声
- self.stats['echo_detected'] += 1
- # logger.debug("🚨 无法生成音频指纹,默认过滤")
- return True
- # TTS播放期间,采用更严格的过滤策略
- # 只有非常明显的用户语音才允许通过
- if self._is_very_strong_user_voice(current_fingerprint):
- # 进一步检查与TTS的差异,使用更严格的标准
- if self._has_extreme_difference_from_tts(current_fingerprint):
- # logger.debug("🎤 检测到非常强烈的用户语音特征且与TTS有极大差异,允许通过")
- return False
- else:
- # 即使能量很强,但与TTS差异不够大,仍然过滤
- self.stats['echo_detected'] += 1
- logger.debug("🚨 强用户语音但与TTS差异不够,仍然过滤")
- return True
- # 其他情况全部过滤,防止回声
- self.stats['echo_detected'] += 1
- # logger.debug("🚨 TTS播放期间严格过滤音频,防止回声")
- return True
- # 检查是否在TTS结束后的淡出期内
- if current_time - self.last_tts_end_time < self.tts_fade_out_duration:
- # 在TTS淡出期内,采用更宽松的过滤策略
- # 生成当前音频的指纹进行分析
- current_fingerprint = self._generate_audio_fingerprint(
- audio_data, current_time)
- if not current_fingerprint:
- # 无法生成指纹时默认过滤
- self.stats['echo_detected'] += 1
- # logger.debug("🚨 TTS淡出期内无法生成音频指纹,默认过滤")
- return True
- # 在淡出期内,允许明显的用户语音通过
- if self._has_obvious_voice_characteristics(current_fingerprint):
- # 检查与TTS的差异,但使用更宽松的标准
- if self._has_basic_difference_from_tts(current_fingerprint):
- # logger.debug("🎤 TTS淡出期内检测到明显用户语音,允许通过")
- return False
- # 其他情况仍然过滤,但记录更详细的信息
- self.stats['echo_detected'] += 1
- logger.debug(
- f"🚨 TTS淡出期内过滤音频(距离TTS结束 {current_time - self.last_tts_end_time:.1f}s)")
- return True
- # 生成当前音频的指纹
- current_fingerprint = self._generate_audio_fingerprint(
- audio_data, current_time)
- if not current_fingerprint:
- return False
- # 与最近的TTS指纹进行比较
- is_echo = self._compare_with_tts_fingerprints(
- current_fingerprint)
- if is_echo:
- self.stats['echo_detected'] += 1
- # 更新统计信息
- self.stats['total_processed'] += 1
- processing_time = time.time() - start_time
- self.stats['processing_time_avg'] = (
- self.stats['processing_time_avg'] * (self.stats['total_processed'] - 1) +
- processing_time
- ) / self.stats['total_processed']
- return is_echo
- except Exception as e:
- logger.error(f"❌ 回声检测失败: {e}")
- # 出错时默认不认为是回声,避免影响正常功能
- return False
- def _generate_audio_fingerprint(self, audio_data: bytes, timestamp: float) -> Optional[AudioFingerprint]:
- """生成音频指纹"""
- try:
- # 转换为numpy数组
- samples = np.frombuffer(
- audio_data, dtype=np.int16).astype(np.float32)
- if len(samples) == 0:
- return None
- # 计算能量
- energy = float(np.mean(samples ** 2))
- if energy < self.energy_threshold:
- return None
- # 计算频谱特征
- spectral_features = self._extract_spectral_features(samples)
- # 生成更详细的指纹哈希 - 包含更多特征
- feature_str = f"{energy:.2f}_{len(samples)}_{np.mean(spectral_features):.4f}_{np.std(spectral_features):.4f}"
- fingerprint = hashlib.md5(feature_str.encode()).hexdigest()
- duration = len(samples) / self.sample_rate
- # 记录更详细的调试信息
- if EchoCancellationConf.should_log_audio_fingerprints():
- logger.debug(
- f"🎵 生成音频指纹: 能量={energy:.1f}, 时长={duration:.3f}s, 特征均值={np.mean(spectral_features):.3f}")
- return AudioFingerprint(
- fingerprint=fingerprint,
- timestamp=timestamp,
- duration=duration,
- energy=energy,
- spectral_features=spectral_features
- )
- except Exception as e:
- logger.error(f"❌ 生成音频指纹失败: {e}")
- return None
- def _extract_spectral_features(self, samples: np.ndarray) -> List[float]:
- """提取频谱特征"""
- try:
- # 确保样本长度足够进行FFT
- if len(samples) < self.fft_size:
- # 零填充
- padded_samples = np.zeros(self.fft_size)
- padded_samples[:len(samples)] = samples
- samples = padded_samples
- # 应用窗函数
- windowed = samples[:self.fft_size] * np.hanning(self.fft_size)
- # FFT变换
- fft_result = np.fft.fft(windowed)
- magnitude_spectrum = np.abs(fft_result[:self.fft_size//2])
- # 计算mel频率特征
- mel_features = self._compute_mel_features(magnitude_spectrum)
- return mel_features.tolist()
- except Exception as e:
- logger.error(f"❌ 提取频谱特征失败: {e}")
- return [0.0] * self.mel_filters
- def _compute_mel_features(self, magnitude_spectrum: np.ndarray) -> np.ndarray:
- """计算Mel频率特征"""
- try:
- # 简化的Mel滤波器组
- mel_filters = np.linspace(
- 0, len(magnitude_spectrum), self.mel_filters + 2)
- mel_features = np.zeros(self.mel_filters)
- for i in range(self.mel_filters):
- start_idx = int(mel_filters[i])
- end_idx = int(mel_filters[i + 2])
- if end_idx > start_idx:
- mel_features[i] = np.mean(
- magnitude_spectrum[start_idx:end_idx])
- # 对数变换
- mel_features = np.log(mel_features + 1e-10)
- return mel_features
- except Exception as e:
- logger.error(f"❌ 计算Mel特征失败: {e}")
- return np.zeros(self.mel_filters)
- def _compare_with_tts_fingerprints(self, current_fingerprint: AudioFingerprint) -> bool:
- """与TTS指纹进行比较"""
- try:
- current_time = current_fingerprint.timestamp
- for tts_fingerprint in self.tts_audio_fingerprints:
- # 检查时间窗口
- time_diff = current_time - tts_fingerprint.timestamp
- if time_diff > self.time_window:
- continue
- # 比较指纹哈希
- if current_fingerprint.fingerprint == tts_fingerprint.fingerprint:
- return True
- # 比较能量和频谱特征
- if self._is_similar_audio(current_fingerprint, tts_fingerprint):
- return True
- return False
- except Exception as e:
- logger.error(f"❌ 指纹比较失败: {e}")
- return False
- def _is_similar_audio(self, fp1: AudioFingerprint, fp2: AudioFingerprint) -> bool:
- """判断两个音频指纹是否相似"""
- try:
- # 能量相似性检查
- energy_ratio = min(fp1.energy, fp2.energy) / \
- max(fp1.energy, fp2.energy)
- if energy_ratio < 0.5:
- return False
- # 频谱特征相似性检查
- if len(fp1.spectral_features) != len(fp2.spectral_features):
- return False
- # 计算余弦相似度
- features1 = np.array(fp1.spectral_features)
- features2 = np.array(fp2.spectral_features)
- norm1 = np.linalg.norm(features1)
- norm2 = np.linalg.norm(features2)
- if norm1 == 0 or norm2 == 0:
- return False
- cosine_similarity = np.dot(features1, features2) / (norm1 * norm2)
- return cosine_similarity > self.correlation_threshold
- except Exception as e:
- logger.error(f"❌ 音频相似性计算失败: {e}")
- return False
- def _is_likely_user_voice(self, fingerprint: AudioFingerprint) -> bool:
- """判断是否可能是用户语音(用于打断检测)"""
- try:
- # 1. 能量阈值检查 - 用户语音通常有足够的能量
- user_voice_threshold = self.energy_threshold * \
- self.voice_detection_config['energy_multiplier']
- if fingerprint.energy < user_voice_threshold:
- return False
- # 2. 频谱特征分析 - 人声有特定的频谱特征
- features = np.array(fingerprint.spectral_features)
- # 检查频谱分布是否符合人声特征
- # 人声通常在中低频有较强的能量
- if len(features) >= 8:
- low_freq_energy = np.mean(features[:4]) # 低频部分
- mid_freq_energy = np.mean(features[4:8]) # 中频部分
- high_freq_energy = np.mean(features[8:]) if len(
- features) > 8 else 0 # 高频部分
- # 人声特征:中低频能量较强,高频相对较弱
- low_freq_weight = self.voice_detection_config['low_freq_weight']
- if mid_freq_energy > low_freq_energy * low_freq_weight and mid_freq_energy > high_freq_energy:
- # 3. 与最近TTS音频的差异检查
- if self._has_significant_difference_from_tts(fingerprint):
- self.stats['user_interrupts_detected'] += 1
- return True
- return False
- except Exception as e:
- logger.error(f"❌ 用户语音判断失败: {e}")
- return False
- def _has_significant_difference_from_tts(self, fingerprint: AudioFingerprint) -> bool:
- """检查与TTS音频是否有显著差异"""
- try:
- if not self.tts_audio_fingerprints:
- return True # 没有TTS参考,认为是用户语音
- current_time = fingerprint.timestamp
- tts_reference_window = EchoCancellationConf.TTS_REFERENCE_WINDOW
- # 找到最近的TTS指纹进行比较
- recent_tts_fingerprints = [
- fp for fp in self.tts_audio_fingerprints
- if current_time - fp.timestamp < tts_reference_window
- ]
- if not recent_tts_fingerprints:
- return True # 没有最近的TTS参考
- # 与最近的TTS指纹比较
- check_count = self.voice_detection_config['recent_tts_check_count']
- energy_diff_threshold = self.voice_detection_config['energy_diff_threshold']
- spectral_diff_threshold = self.voice_detection_config['spectral_diff_threshold']
- for tts_fp in recent_tts_fingerprints[-check_count:]:
- # 能量差异检查
- energy_diff = abs(fingerprint.energy - tts_fp.energy) / \
- max(fingerprint.energy, tts_fp.energy)
- if energy_diff > energy_diff_threshold:
- continue
- # 频谱特征差异检查
- if len(fingerprint.spectral_features) == len(tts_fp.spectral_features):
- features1 = np.array(fingerprint.spectral_features)
- features2 = np.array(tts_fp.spectral_features)
- # 计算频谱差异
- spectral_diff = np.mean(np.abs(features1 - features2))
- if spectral_diff < spectral_diff_threshold:
- return False # 频谱过于相似,可能是回声
- return True # 与TTS有显著差异,可能是用户语音
- except Exception as e:
- logger.error(f"❌ TTS差异检查失败: {e}")
- return True # 出错时倾向于认为是用户语音
- def _update_processing_time(self, processing_time: float):
- """更新处理时间统计"""
- if self.stats['total_processed'] > 0:
- alpha = 0.1 # 平滑因子
- self.stats['processing_time_avg'] = (
- alpha * processing_time +
- (1 - alpha) * self.stats['processing_time_avg']
- )
- else:
- self.stats['processing_time_avg'] = processing_time
- def cleanup_old_fingerprints(self):
- """清理过期的指纹"""
- try:
- with self._lock:
- current_time = time.time()
- # 清理过期的TTS指纹
- while (self.tts_audio_fingerprints and
- current_time - self.tts_audio_fingerprints[0].timestamp > self.time_window):
- self.tts_audio_fingerprints.popleft()
- except Exception as e:
- logger.error(f"❌ 清理指纹失败: {e}")
- def get_stats(self) -> Dict:
- """获取统计信息"""
- with self._lock:
- total = self.stats['total_processed']
- return {
- 'total_processed': total,
- 'echo_detected': self.stats['echo_detected'],
- 'user_interrupts_detected': self.stats['user_interrupts_detected'],
- 'echo_detection_rate': self.stats['echo_detected'] / max(total, 1),
- 'interrupt_success_rate': self.stats['user_interrupts_detected'] / max(total, 1),
- 'processing_time_avg_ms': self.stats['processing_time_avg'] * 1000,
- 'fingerprints_stored': len(self.tts_audio_fingerprints),
- 'config': {
- 'is_enabled': self.is_enabled,
- 'interrupt_during_playback': EchoCancellationConf.ENABLE_INTERRUPT_DURING_PLAYBACK,
- 'energy_threshold': self.energy_threshold,
- 'user_voice_threshold': self.energy_threshold * self.voice_detection_config['energy_multiplier']
- }
- }
- def enable(self):
- """启用回声消除"""
- self.is_enabled = True
- logger.info("✅ 回声消除已启用")
- def disable(self):
- """禁用回声消除"""
- self.is_enabled = False
- logger.info("❌ 回声消除已禁用")
- def _is_tts_variant_audio(self, fingerprint: AudioFingerprint) -> bool:
- """检测是否为TTS音频的变种(经过扬声器-麦克风传输后的音频)"""
- try:
- if not self.tts_audio_fingerprints:
- return False
- current_time = fingerprint.timestamp
- detection_window = self.tts_filtering_config['variant_detection_window']
- # 检查最近的TTS指纹
- recent_tts_fingerprints = [
- fp for fp in self.tts_audio_fingerprints
- if current_time - fp.timestamp < detection_window
- ]
- if not recent_tts_fingerprints:
- return False
- energy_range = self.tts_filtering_config['energy_attenuation_range']
- similarity_threshold = self.tts_filtering_config['variant_similarity_threshold']
- correlation_threshold = self.tts_filtering_config['frequency_correlation_threshold']
- # 检查所有最近的TTS指纹,而不仅仅是最后几个
- for tts_fp in recent_tts_fingerprints:
- # 1. 时间相关性检查 - 播放开始后短时间内的音频很可能是回声
- time_diff = current_time - tts_fp.timestamp
- if time_diff < 1.0: # 1秒内
- # 2. 能量衰减检查 - 扬声器播放的音频通过麦克风录制会有能量衰减
- if tts_fp.energy > 0: # 避免除零
- energy_ratio = fingerprint.energy / tts_fp.energy
- if energy_range[0] <= energy_ratio <= energy_range[1]:
- # 3. 频谱形状相似性检查
- if len(fingerprint.spectral_features) == len(tts_fp.spectral_features):
- features1 = np.array(
- fingerprint.spectral_features)
- features2 = np.array(tts_fp.spectral_features)
- # 归一化频谱特征以消除能量差异的影响
- if np.linalg.norm(features1) > 0 and np.linalg.norm(features2) > 0:
- features1_norm = features1 / \
- np.linalg.norm(features1)
- features2_norm = features2 / \
- np.linalg.norm(features2)
- # 计算归一化后的相似度
- similarity = np.dot(
- features1_norm, features2_norm)
- # 如果归一化后的相似度高,很可能是TTS音频的变种
- if similarity > similarity_threshold:
- if EchoCancellationConf.should_log_detection_details():
- logger.debug(
- f"🚨 TTS变种检测: 时间差={time_diff:.3f}s, 能量比={energy_ratio:.3f}, 相似度={similarity:.3f}")
- return True
- # 4. 频率分布相关性检查
- if self._has_similar_frequency_distribution(fingerprint, tts_fp, correlation_threshold):
- if EchoCancellationConf.should_log_detection_details():
- logger.debug(f"🚨 TTS变种检测: 频率分布相似,时间差={time_diff:.3f}s")
- return True
- return False
- except Exception as e:
- logger.error(f"❌ TTS变种检测失败: {e}")
- return False
- def _has_similar_frequency_distribution(self, fp1: AudioFingerprint, fp2: AudioFingerprint, threshold: float = 0.5) -> bool:
- """检查两个音频指纹是否有相似的频率分布"""
- try:
- if len(fp1.spectral_features) != len(fp2.spectral_features):
- return False
- features1 = np.array(fp1.spectral_features)
- features2 = np.array(fp2.spectral_features)
- # 计算频率分布的相关系数
- if len(features1) > 1:
- correlation = np.corrcoef(features1, features2)[0, 1]
- return not np.isnan(correlation) and correlation > threshold
- return False
- except Exception as e:
- logger.error(f"❌ 频率分布比较失败: {e}")
- return False
- def _is_definitely_user_voice(self, fingerprint: AudioFingerprint) -> bool:
- """严格判断是否为确定的用户语音(用于播放中的打断检测)"""
- try:
- # 1. 更高的能量阈值 - 用户打断时通常会更大声
- energy_multiplier = self.tts_filtering_config['definite_voice_energy_multiplier']
- high_energy_threshold = self.energy_threshold * \
- self.voice_detection_config['energy_multiplier'] * \
- energy_multiplier
- if fingerprint.energy < high_energy_threshold:
- return False
- # 2. 严格的频谱特征分析
- features = np.array(fingerprint.spectral_features)
- if len(features) >= 8:
- low_freq_energy = np.mean(features[:4]) # 低频部分
- mid_freq_energy = np.mean(features[4:8]) # 中频部分
- high_freq_energy = np.mean(features[8:]) if len(
- features) > 8 else 0 # 高频部分
- # 人声特征检查 - 更严格的标准
- # 中频能量应该明显高于低频和高频
- if not (mid_freq_energy > low_freq_energy * 0.8 and
- mid_freq_energy > high_freq_energy * 1.2):
- return False
- # 3. 检查频谱的动态范围 - 人声通常有较大的动态范围
- min_spectral_range = self.tts_filtering_config['min_spectral_range']
- spectral_range = np.max(features) - np.min(features)
- if spectral_range < min_spectral_range:
- return False
- # 4. 与所有TTS音频的差异检查 - 必须与所有TTS音频都有显著差异
- if not self._has_significant_difference_from_all_tts(fingerprint):
- return False
- # 5. 检查音频的复杂度 - 人声通常比TTS更复杂
- if not self._has_sufficient_complexity(fingerprint):
- return False
- # 所有检查都通过,认为是确定的用户语音
- self.stats['user_interrupts_detected'] += 1
- return True
- except Exception as e:
- logger.error(f"❌ 确定用户语音判断失败: {e}")
- return False
- def _has_significant_difference_from_all_tts(self, fingerprint: AudioFingerprint) -> bool:
- """检查与所有TTS音频是否都有显著差异"""
- try:
- if not self.tts_audio_fingerprints:
- return True
- current_time = fingerprint.timestamp
- max_similarity = self.tts_filtering_config['max_similarity_with_tts']
- # 检查所有最近的TTS指纹
- recent_tts_fingerprints = [
- fp for fp in self.tts_audio_fingerprints
- if current_time - fp.timestamp < 3.0 # 3秒内的所有TTS
- ]
- if not recent_tts_fingerprints:
- return True
- # 必须与所有TTS音频都有显著差异
- for tts_fp in recent_tts_fingerprints:
- # 能量差异检查 - 更严格
- energy_ratio = min(fingerprint.energy, tts_fp.energy) / \
- max(fingerprint.energy, tts_fp.energy)
- if energy_ratio > 0.7: # 能量过于相似
- return False
- # 频谱相似度检查 - 更严格
- if len(fingerprint.spectral_features) == len(tts_fp.spectral_features):
- features1 = np.array(fingerprint.spectral_features)
- features2 = np.array(tts_fp.spectral_features)
- if np.linalg.norm(features1) > 0 and np.linalg.norm(features2) > 0:
- # 归一化比较
- features1_norm = features1 / np.linalg.norm(features1)
- features2_norm = features2 / np.linalg.norm(features2)
- similarity = np.dot(features1_norm, features2_norm)
- if similarity > max_similarity: # 相似度过高
- return False
- return True
- except Exception as e:
- logger.error(f"❌ 全TTS差异检查失败: {e}")
- return False
- def _has_sufficient_complexity(self, fingerprint: AudioFingerprint) -> bool:
- """检查音频是否有足够的复杂度(人声特征)"""
- try:
- features = np.array(fingerprint.spectral_features)
- min_variation = self.tts_filtering_config['min_spectral_variation']
- # 1. 频谱变化检查 - 人声通常有更多的频谱变化
- if len(features) > 1:
- spectral_variation = np.std(features)
- if spectral_variation < min_variation:
- return False
- # 2. 频谱分布检查 - 人声应该有特定的频率分布
- if len(features) >= 6:
- # 检查是否有明显的共振峰特征
- # 人声通常在某些频段有能量集中
- max_energy_idx = np.argmax(features)
- if max_energy_idx < 2 or max_energy_idx > len(features) - 2:
- # 能量峰值在边缘,可能不是人声
- return False
- return True
- except Exception as e:
- logger.error(f"❌ 复杂度检查失败: {e}")
- return False
- def _is_likely_user_voice_relaxed(self, fingerprint: AudioFingerprint) -> bool:
- """宽松判断是否为用户语音(用于非严格模式的打断检测)"""
- try:
- # 1. 极严格的能量阈值 - 避免嘈杂环境误判
- user_voice_threshold = self.energy_threshold * 5.0 # 大幅提高阈值
- if fingerprint.energy < user_voice_threshold:
- return False
- # 2. 如果音频能量足够,需要极严格的检查
- moderate_energy_threshold = self.energy_threshold * 20.0 # 极大幅提高阈值
- if fingerprint.energy > moderate_energy_threshold:
- # 能量足够时,需要通过更严格的差异检查
- if self._has_strict_difference_from_tts(fingerprint):
- logger.debug(
- f"🎤 高能量({fingerprint.energy:.1f}),通过严格差异检查,认为是用户语音")
- return True
- else:
- logger.debug(
- f"🚫 高能量({fingerprint.energy:.1f}),但未通过严格差异检查,可能是噪音")
- return False
- # 3. 基本的频谱特征检查(更宽松)
- features = np.array(fingerprint.spectral_features)
- if len(features) >= 4: # 降低要求
- # 检查是否有人声的基本特征
- if len(features) >= 6:
- mid_freq_energy = np.mean(features[2:5]) # 中频部分
- total_energy = np.mean(features)
- # 中频能量占比检查(更宽松)
- if mid_freq_energy > total_energy * 0.3: # 从0.5降低到0.3
- # 简单的与TTS差异检查
- if self._has_basic_difference_from_tts(fingerprint):
- return True
- # 4. 如果音频足够大声,需要极严格的检查
- high_energy_threshold = self.energy_threshold * 50.0 # 提高到50.0,极严格
- if fingerprint.energy > high_energy_threshold:
- # 即使能量很高,也要检查与TTS的差异,并且需要更严格的条件
- if self._has_strict_difference_from_tts(fingerprint):
- logger.debug(
- f"🎤 极高能量({fingerprint.energy:.1f})且通过严格差异检查,认为是用户语音")
- return True
- else:
- logger.debug(
- f"🚫 极高能量({fingerprint.energy:.1f}),但未通过严格差异检查,可能是强噪音")
- return False
- # 5. 时间窗口检查 - 距离TTS较远时更容易认为是用户语音
- if self.tts_audio_fingerprints:
- last_tts_time = max(
- fp.timestamp for fp in self.tts_audio_fingerprints)
- time_since_last_tts = fingerprint.timestamp - last_tts_time
- if time_since_last_tts > 1.5: # 从1.0提高到1.5秒
- logger.debug(
- f"🎤 距离最后TTS较远({time_since_last_tts:.1f}s),认为是用户语音")
- return True
- return False
- except Exception as e:
- logger.error(f"❌ 宽松用户语音判断失败: {e}")
- # 出错时倾向于认为是用户语音,允许打断
- return True
- def _has_basic_difference_from_tts(self, fingerprint: AudioFingerprint) -> bool:
- """基本的与TTS差异检查(更宽松)"""
- try:
- if not self.tts_audio_fingerprints:
- return True # 没有TTS参考,认为是用户语音
- current_time = fingerprint.timestamp
- # 检查是否在TTS播放期间
- if self.is_playing_tts:
- # TTS播放期间采用更严格的标准
- # 只检查最近0.3秒内的TTS,缩短时间窗口
- recent_tts_fingerprints = [
- fp for fp in self.tts_audio_fingerprints
- if current_time - fp.timestamp < 0.3
- ]
- if not recent_tts_fingerprints:
- return True # 没有最近的TTS参考
- # 与最近的TTS指纹比较(TTS播放期间更严格的标准)
- for tts_fp in recent_tts_fingerprints[-1:]: # 只检查最近1个
- # 能量差异检查(TTS播放期间更严格)
- energy_ratio = min(fingerprint.energy, tts_fp.energy) / \
- max(fingerprint.energy, tts_fp.energy)
- if energy_ratio > 0.6: # TTS播放期间需要更大的能量差异
- logger.debug(
- f"🚨 TTS播放期间能量过于相似({energy_ratio:.2f}),可能是回声")
- return False
- # 频谱特征差异检查(TTS播放期间更严格)
- if len(fingerprint.spectral_features) == len(tts_fp.spectral_features):
- features1 = np.array(fingerprint.spectral_features)
- features2 = np.array(tts_fp.spectral_features)
- # 计算简单的欧氏距离
- distance = np.linalg.norm(features1 - features2)
- if distance < 0.8: # TTS播放期间需要更大的频谱差异
- logger.debug(
- f"🚨 TTS播放期间频谱距离过小({distance:.2f}),可能是回声")
- return False
- # 检查相关性
- if np.linalg.norm(features1) > 0 and np.linalg.norm(features2) > 0:
- features1_norm = features1 / \
- np.linalg.norm(features1)
- features2_norm = features2 / \
- np.linalg.norm(features2)
- correlation = np.dot(
- features1_norm, features2_norm)
- if correlation > 0.5: # TTS播放期间相关性要求更严格
- logger.debug(
- f"🚨 TTS播放期间频谱相关性过高({correlation:.2f}),可能是回声")
- return False
- return True # 通过TTS播放期间的严格检查
- else:
- # 非TTS播放期间采用原来的宽松标准
- # 只检查最近0.5秒内的TTS,缩短时间窗口
- recent_tts_fingerprints = [
- fp for fp in self.tts_audio_fingerprints
- if current_time - fp.timestamp < 0.5
- ]
- if not recent_tts_fingerprints:
- return True # 没有最近的TTS参考
- # 与最近的TTS指纹比较(非常宽松的标准)
- for tts_fp in recent_tts_fingerprints[-1:]: # 只检查最近1个
- # 能量差异检查(更宽松)
- energy_ratio = min(fingerprint.energy, tts_fp.energy) / \
- max(fingerprint.energy, tts_fp.energy)
- if energy_ratio > 0.8: # 从0.5提高到0.8,需要更相似才认为可疑
- # 频谱特征差异检查(更宽松)
- if len(fingerprint.spectral_features) == len(tts_fp.spectral_features):
- features1 = np.array(fingerprint.spectral_features)
- features2 = np.array(tts_fp.spectral_features)
- # 计算简单的欧氏距离
- distance = np.linalg.norm(features1 - features2)
- if distance < 0.5: # 从1.0降低到0.5,需要更相似才认为是回声
- logger.debug(f"🚨 频谱距离过小({distance:.2f}),可能是回声")
- return False
- return True # 通过基本检查,认为是用户语音
- except Exception as e:
- logger.error(f"❌ 基本TTS差异检查失败: {e}")
- return True # 出错时倾向于认为是用户语音
- def _has_strict_difference_from_tts(self, fingerprint: AudioFingerprint) -> bool:
- """严格的与TTS差异检查(用于高能量音频)"""
- try:
- if not self.tts_audio_fingerprints:
- return True # 没有TTS参考,认为是用户语音
- current_time = fingerprint.timestamp
- # 检查最近1秒内的TTS,时间窗口更严格
- recent_tts_fingerprints = [
- fp for fp in self.tts_audio_fingerprints
- if current_time - fp.timestamp < 1.0
- ]
- if not recent_tts_fingerprints:
- return True # 没有最近的TTS参考
- # 与最近的TTS指纹比较(非常严格的标准)
- for tts_fp in recent_tts_fingerprints:
- # 1. 时间相关性检查 - 如果在TTS播放后很短时间内出现,更可能是回声
- time_diff = current_time - tts_fp.timestamp
- if time_diff < 0.3: # 300ms内
- logger.debug(f"🚫 时间过近({time_diff:.2f}s),可能是回声")
- return False
- # 2. 能量差异检查(非常严格)
- energy_ratio = min(fingerprint.energy, tts_fp.energy) / \
- max(fingerprint.energy, tts_fp.energy)
- if energy_ratio > 0.9: # 需要能量差异很大才认为不是回声
- logger.debug(f"🚫 能量过于相似({energy_ratio:.2f}),可能是回声")
- return False
- # 3. 频谱特征差异检查(非常严格)
- if len(fingerprint.spectral_features) == len(tts_fp.spectral_features):
- features1 = np.array(fingerprint.spectral_features)
- features2 = np.array(tts_fp.spectral_features)
- # 计算欧氏距离和相关性
- distance = np.linalg.norm(features1 - features2)
- if distance < 0.3: # 需要频谱差异很大
- logger.debug(f"🚫 频谱过于相似({distance:.2f}),可能是回声")
- return False
- # 检查相关性
- if np.linalg.norm(features1) > 0 and np.linalg.norm(features2) > 0:
- features1_norm = features1 / np.linalg.norm(features1)
- features2_norm = features2 / np.linalg.norm(features2)
- correlation = np.dot(features1_norm, features2_norm)
- if correlation > 0.7: # 相关性过高
- logger.debug(f"🚫 频谱相关性过高({correlation:.2f}),可能是回声")
- return False
- return True # 通过严格检查,认为是用户语音
- except Exception as e:
- logger.error(f"❌ 严格TTS差异检查失败: {e}")
- return False # 出错时倾向于认为是回声,减少误判
- def _is_definitely_echo(self, fingerprint: AudioFingerprint) -> bool:
- """确定判断是否为回声(用于非严格模式)"""
- try:
- if not self.tts_audio_fingerprints:
- return False # 没有TTS参考,不能确定是回声
- current_time = fingerprint.timestamp
- # 检查最近的TTS指纹
- recent_tts_fingerprints = [
- fp for fp in self.tts_audio_fingerprints
- if current_time - fp.timestamp < 1.5 # 1.5秒内的TTS
- ]
- if not recent_tts_fingerprints:
- return False # 没有最近的TTS参考
- for tts_fp in recent_tts_fingerprints:
- # 1. 时间相关性检查 - 如果在TTS播放后很短时间内出现
- time_diff = current_time - tts_fp.timestamp
- if time_diff < 0.3: # 300ms内
- # 2. 能量衰减检查 - 符合扬声器到麦克风的衰减特征
- energy_ratio = fingerprint.energy / tts_fp.energy
- if 0.1 <= energy_ratio <= 0.7: # 能量衰减10%-70%
- # 3. 频谱相似性检查 - 高度相似
- if len(fingerprint.spectral_features) == len(tts_fp.spectral_features):
- features1 = np.array(fingerprint.spectral_features)
- features2 = np.array(tts_fp.spectral_features)
- if np.linalg.norm(features1) > 0 and np.linalg.norm(features2) > 0:
- # 归一化比较
- features1_norm = features1 / \
- np.linalg.norm(features1)
- features2_norm = features2 / \
- np.linalg.norm(features2)
- similarity = np.dot(
- features1_norm, features2_norm)
- # 如果相似度很高,几乎确定是回声
- if similarity > 0.8:
- return True
- # 4. 完全匹配检查
- if fingerprint.fingerprint == tts_fp.fingerprint:
- return True
- return False # 不能确定是回声
- except Exception as e:
- logger.error(f"❌ 确定回声判断失败: {e}")
- return False # 出错时不确定是回声
- def _trigger_interrupt(self):
- """已移除:不再触发打断信号,语音正常播放进行消音"""
- try:
- # 防止重复触发
- current_time = time.time()
- if hasattr(self, '_last_trigger_time') and current_time - self._last_trigger_time < 0.5:
- logger.debug("🔇 打断信号防抖:跳过重复触发")
- return
- self._last_trigger_time = current_time
- # 不再创建待验证的打断请求,只进行消音处理
- logger.debug("🔇 回声消除检测到音频,进行消音处理但不打断播放")
- except Exception as e:
- logger.error(f"❌ 消音处理失败: {e}")
- import traceback
- logger.error(f"消音处理异常详情: {traceback.format_exc()}")
- def _is_very_strong_user_voice(self, fingerprint: AudioFingerprint) -> bool:
- """
- 检测是否为非常强的用户语音
- 用于在TTS播放期间让明显的用户语音通过,触发VAD和IAT
- """
- try:
- # 1. 适中的能量阈值 - 让更多用户语音通过
- high_energy_threshold = self.energy_threshold * 3.0 # 降低到3倍基础阈值
- if fingerprint.energy < high_energy_threshold:
- return False
- # 2. 检查是否有明显的人声特征
- if not self._has_obvious_voice_characteristics(fingerprint):
- return False
- # 3. 与TTS音频有足够差异
- if not self._has_significant_difference_from_tts(fingerprint):
- return False
- # logger.debug(f"🔊 检测到强用户语音,能量: {fingerprint.energy:.1f}")
- return True
- except Exception as e:
- logger.error(f"❌ 强用户语音检测失败: {e}")
- return False
- def _has_extreme_difference_from_tts(self, fingerprint: AudioFingerprint) -> bool:
- """检查与TTS音频是否有极大差异"""
- try:
- if not self.tts_audio_fingerprints:
- return True
- current_time = fingerprint.timestamp
- # 检查最近的TTS指纹
- recent_tts_fingerprints = [
- fp for fp in self.tts_audio_fingerprints
- if current_time - fp.timestamp < 3.0 # 3秒内的TTS,延长检查时间
- ]
- if not recent_tts_fingerprints:
- return True
- # 必须与所有TTS音频都有极大的能量差异
- for tts_fp in recent_tts_fingerprints:
- energy_ratio = min(fingerprint.energy, tts_fp.energy) / \
- max(fingerprint.energy, tts_fp.energy)
- if energy_ratio > 0.3: # 能量差异阈值降低到0.3,更严格
- return False
- # 进一步检查频谱特征差异
- for tts_fp in recent_tts_fingerprints:
- if self._has_similar_frequency_distribution(fingerprint, tts_fp, threshold=0.6):
- return False
- return True
- except Exception as e:
- logger.error(f"❌ 极大TTS差异检查失败: {e}")
- return False
- def _has_obvious_voice_characteristics(self, fingerprint: AudioFingerprint) -> bool:
- """检查是否有明显的人声特征"""
- try:
- features = np.array(fingerprint.spectral_features)
- if len(features) < 6:
- return False
- # 简单但有效的人声检查
- low_freq = np.mean(features[:2]) # 低频
- mid_freq = np.mean(features[2:5]) # 中频
- high_freq = np.mean(features[5:]) # 高频
- # 放宽人声检测条件
- # 1. 中频能量较强(主要条件)
- if mid_freq > low_freq * 0.6 and mid_freq > high_freq * 0.6:
- return True
- # 2. 或者能量分布相对均匀(人声特征)
- total_energy = low_freq + mid_freq + high_freq
- if total_energy > 0:
- mid_ratio = mid_freq / total_energy
- if 0.25 < mid_ratio < 0.6: # 中频占比合理范围
- return True
- return False
- except Exception as e:
- logger.error(f"❌ 明显人声特征检测失败: {e}")
- return False
- # 已移除:_is_very_likely_user_voice_during_tts 方法(不再需要TTS期间的打断检测)
- def _has_strong_voice_characteristics(self, fingerprint: AudioFingerprint) -> bool:
- """检查是否具有强烈的人声特征"""
- try:
- features = np.array(fingerprint.spectral_features)
- if len(features) < 8:
- return False
- # 更严格的频谱分布检查
- low_freq = np.mean(features[:2]) # 低频
- mid_freq = np.mean(features[2:6]) # 中频
- high_freq = np.mean(features[6:]) # 高频
- # 中频能量必须明显强于低频和高频(人声特征)
- if mid_freq < low_freq * 1.2 or mid_freq < high_freq * 1.5:
- return False
- # 检查频谱的平滑度 - 人声通常有特定的共振峰
- spectral_variance = np.var(features)
- if spectral_variance < 0.1: # 频谱过于平滑可能是TTS
- return False
- return True
- except Exception as e:
- logger.error(f"❌ 强人声特征检测失败: {e}")
- return False
- def _has_high_spectral_complexity(self, fingerprint: AudioFingerprint) -> bool:
- """检查是否具有高频谱复杂度"""
- try:
- features = np.array(fingerprint.spectral_features)
- if len(features) < 4:
- return False
- # 计算频谱的熵(复杂度指标)
- normalized_features = features / \
- np.sum(features) if np.sum(features) > 0 else features
- entropy = -np.sum(normalized_features *
- np.log(normalized_features + 1e-10))
- # 人声通常有较高的熵值
- min_entropy = 2.0 # 根据实际情况调整
- if entropy < min_entropy:
- return False
- return True
- except Exception as e:
- logger.error(f"❌ 频谱复杂度检测失败: {e}")
- return False
- def _is_possibly_user_voice_during_tts(self, fingerprint: AudioFingerprint) -> bool:
- """
- 在TTS播放期间判断是否可能是用户语音
- 使用更宽松的标准,主要用于创建待验证的打断请求
- """
- try:
- # 1. 基本能量检查 - 需要足够的能量
- min_energy_threshold = self.energy_threshold * 2.0 # 降低能量要求
- if fingerprint.energy < min_energy_threshold:
- return False
- # 2. 与TTS音频的基本差异检查
- if not self._has_basic_difference_from_tts(fingerprint):
- return False
- # 3. 检查是否有人声特征
- if not self._has_voice_characteristics(fingerprint):
- return False
- return True
- except Exception as e:
- logger.error(f"❌ TTS期间用户语音检测失败: {e}")
- return False
- def _has_voice_characteristics(self, fingerprint: AudioFingerprint) -> bool:
- """检查音频是否具有人声特征"""
- try:
- features = np.array(fingerprint.spectral_features)
- if len(features) < 4:
- return False
- # 检查频谱分布 - 人声通常在中频有较强能量
- if len(features) >= 8:
- low_freq = np.mean(features[:3])
- mid_freq = np.mean(features[3:7])
- high_freq = np.mean(features[7:]) if len(features) > 7 else 0
- # 中频能量应该相对较强
- if mid_freq < low_freq * 0.5:
- return False
- return True
- except Exception as e:
- logger.error(f"❌ 人声特征检测失败: {e}")
- return False
- # 已移除:_create_pending_interrupt_request 方法(不再需要创建待验证的打断请求)
- class SelfVoiceDetector:
- """自我声音检测器"""
- def __init__(self):
- self.echo_engine = EchoCancellationEngine()
- self.voice_callbacks: Dict[str, Callable] = {}
- self._lock = threading.RLock()
- # 定期清理线程
- self._cleanup_thread = threading.Thread(
- target=self._periodic_cleanup, daemon=True)
- self._stop_cleanup = threading.Event()
- self._cleanup_thread.start()
- logger.info("🎯 自我声音检测器已初始化")
- def register_voice_callback(self, name: str, callback: Callable):
- """注册声音检测回调"""
- with self._lock:
- self.voice_callbacks[name] = callback
- logger.info(f"📝 注册声音检测回调: {name}")
- def unregister_voice_callback(self, name: str):
- """注销声音检测回调"""
- with self._lock:
- if name in self.voice_callbacks:
- del self.voice_callbacks[name]
- logger.info(f"🗑️ 注销声音检测回调: {name}")
- def set_tts_playing(self, is_playing: bool, audio_data: Optional[bytes] = None):
- """设置TTS播放状态"""
- self.echo_engine.set_tts_playing_status(is_playing, audio_data)
- # 通知回调
- with self._lock:
- for name, callback in self.voice_callbacks.items():
- try:
- callback('tts_status_changed', {
- 'is_playing': is_playing,
- 'timestamp': time.time()
- })
- except Exception as e:
- logger.error(f"❌ 声音检测回调 {name} 执行失败: {e}")
- def should_ignore_audio(self, audio_data: bytes) -> bool:
- """判断是否应该忽略音频(回声检测)"""
- return self.echo_engine.is_echo_audio(audio_data)
- def process_recording_audio(self, audio_data: bytes) -> bool:
- """处理录音音频,返回是否应该继续处理"""
- if self.should_ignore_audio(audio_data):
- # logger.debug("🚫 检测到自我声音,忽略音频数据")
- return False
- return True
- def _periodic_cleanup(self):
- """定期清理过期数据"""
- while not self._stop_cleanup.is_set():
- try:
- self.echo_engine.cleanup_old_fingerprints()
- time.sleep(0.1) # 每秒清理一次
- except Exception as e:
- logger.error(f"❌ 定期清理失败: {e}")
- time.sleep(1.0) # 出错时等待更长时间
- def get_detection_stats(self) -> Dict:
- """获取检测统计信息"""
- return self.echo_engine.get_stats()
- def enable_echo_cancellation(self):
- """启用回声消除"""
- self.echo_engine.enable()
- def disable_echo_cancellation(self):
- """禁用回声消除"""
- self.echo_engine.disable()
- def shutdown(self):
- """关闭检测器"""
- logger.info("🔄 关闭自我声音检测器...")
- self._stop_cleanup.set()
- if self._cleanup_thread.is_alive():
- self._cleanup_thread.join(timeout=2.0)
- logger.info("✅ 自我声音检测器已关闭")
- # 全局实例
- _self_voice_detector: Optional[SelfVoiceDetector] = None
- _detector_lock = threading.Lock()
- def get_self_voice_detector() -> SelfVoiceDetector:
- """获取全局自我声音检测器实例"""
- global _self_voice_detector
- with _detector_lock:
- if _self_voice_detector is None:
- _self_voice_detector = SelfVoiceDetector()
- return _self_voice_detector
- def cleanup_self_voice_detector():
- """清理全局检测器实例"""
- global _self_voice_detector
- with _detector_lock:
- if _self_voice_detector is not None:
- _self_voice_detector.shutdown()
- _self_voice_detector = None
|