| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- #!/usr/bin/python
- # coding=utf-8
- """
- Author: zhaoyong 77912776@qq.com
- Date: 2025-07-02
- LastEditTime: 2025-08-24
- FilePath: \robot_ai\handlers\aiui\Recorder.py
- Description: 录音类(优化版)
- """
- import platform
- import struct
- import threading
- import numpy as np
- import platform
- import sounddevice as sd
- system = platform.system().lower()
- if system == "linux":
- sd.default.device = 'pulse'
- elif system == "windows":
- sd.default.device = None
- elif system == "darwin":
- sd.default.device = None
- from contextlib import contextmanager
- from utils.echo_cancellation import get_self_voice_detector
- from utils.logger import logger
- def calc_rms(audio_bytes: bytes) -> float:
- """计算音频数据的 RMS 值"""
- if not audio_bytes:
- return 0
- samples = struct.unpack(f'{len(audio_bytes)//2}h', audio_bytes)
- arr = np.array(samples, dtype=np.int16)
- # 避免数值溢出和无效值
- if len(arr) == 0:
- return 0
- # 使用更安全的计算方式,避免大数值溢出
- arr_float = arr.astype(np.float64)
- squared = arr_float ** 2
- mean_squared = np.mean(squared)
- # 检查数值有效性
- if np.isnan(mean_squared) or np.isinf(mean_squared) or mean_squared < 0:
- return 0
- try:
- rms = np.sqrt(mean_squared)
- # 确保结果有效
- if np.isnan(rms) or np.isinf(rms):
- return 0
- return float(rms)
- except (ValueError, RuntimeWarning):
- return 0
- class Recorder:
- def __init__(self, chunk: int, channels: int = 1, rate: int = 16000, fmt: str = None):
- self.chunk = chunk
- self.channels = channels
- self.target_rate = rate # 目标采样率
- self.actual_rate = rate # 实际使用的采样率
- self.format = fmt or 'int16'
- self.platform = platform.system().lower()
- self.device_index = self._find_device()
- if self.device_index is None:
- raise RuntimeError("❌ 未找到可用的音频输入设备")
- # 语音活动检测
- self.is_voice_active = False
- self.voice_activity_threshold = 10.0
- self.voice_activity_count = 0
- self.voice_activity_lock = threading.RLock()
- # 回声消除
- self.self_voice_detector = get_self_voice_detector()
- self.enable_echo_cancellation = True
- # 音频流状态
- self._stream = None
- self._stream_lock = threading.RLock()
- self._is_running = False
- logger.info(
- f"🎙️ 录音器初始化: 声道={self.channels}, 目标采样率={self.target_rate}, 实际采样率={self.actual_rate}, 块大小={self.chunk}")
- logger.info(
- f" 使用设备[{self.device_index}]: {sd.query_devices(self.device_index)['name']}")
- # 如果实际采样率与目标不同,记录警告
- if self.actual_rate != self.target_rate:
- logger.warning(
- f"⚠️ 采样率不匹配: 目标={self.target_rate}Hz, 实际={self.actual_rate}Hz")
- # ----------------- 设备相关 -----------------
- def _find_device(self):
- """自动查找合适的音频输入设备"""
- devices = [
- (i, d) for i, d in enumerate(sd.query_devices())
- if d.get("max_input_channels", 0) > 0
- ]
- if not devices:
- return None
- # Ubuntu环境下的设备优先级
- if platform.system().lower() == 'linux':
- priority = ["pulse", "default", "sysdefault",
- "AIUI-USB-MC", "rockchip-es8388"]
- else:
- # Windows环境下的设备优先级
- priority = ["AIUI-USB-MC", "rockchip-es8388",
- "USB Audio", "sysdefault"]
- # 按优先级排序
- devices.sort(
- key=lambda x: next(
- (i for i, p in enumerate(priority)
- if p.lower() in x[1]["name"].lower()),
- 999
- )
- )
- # 在Ubuntu环境中,优先测试PulseAudio设备
- for idx, dev in devices:
- if self._check_sample_rate_support(idx, self.target_rate):
- logger.info(f"📍 选择输入设备: {dev['name']} ({idx})")
- self.actual_rate = self.target_rate
- return idx
- # 如果没有找到支持目标采样率的设备,尝试使用默认设备
- logger.warning(f"⚠️ 未找到支持 {self.target_rate}Hz 的设备,尝试使用默认设备")
- for idx, dev in devices:
- try:
- # 尝试使用默认采样率
- default_rate = int(dev.get('default_samplerate', 44100))
- if self._check_sample_rate_support(idx, default_rate):
- logger.info(
- f"📍 使用默认采样率设备: {dev['name']} ({idx}) - {default_rate}Hz")
- self.actual_rate = default_rate
- return idx
- except Exception as e:
- logger.debug(f"设备 {idx} 测试失败: {e}")
- continue
- # 最后选择第一个可用设备
- if devices:
- logger.warning(
- f"⚠️ 使用第一个可用设备: {devices[0][1]['name']} ({devices[0][0]})")
- # 尝试使用设备的默认采样率
- default_rate = int(devices[0][1].get('default_samplerate', 44100))
- self.actual_rate = default_rate
- return devices[0][0]
- return None
- def _check_sample_rate_support(self, device_index, target_rate):
- """检查设备是否支持目标采样率"""
- try:
- stream = sd.RawInputStream(
- samplerate=target_rate,
- channels=self.channels,
- dtype=self.format,
- blocksize=self.chunk,
- device=device_index
- )
- stream.close()
- return True
- except Exception as e:
- logger.warning(f"设备 {device_index} 不支持 {target_rate}Hz: {e}")
- return False
- # ----------------- 音频流管理 -----------------
- @contextmanager
- def audio_stream(self):
- """上下文管理器,自动开启/关闭音频流"""
- try:
- self._stream = sd.RawInputStream(
- samplerate=self.actual_rate, # 使用实际采样率
- channels=self.channels,
- dtype=self.format,
- blocksize=self.chunk,
- device=self.device_index
- )
- self._stream.start()
- self._is_running = True
- logger.info(f"✅ 音频流已启动 (采样率: {self.actual_rate}Hz)")
- yield self._stream
- finally:
- self._close_stream()
- def _close_stream(self):
- """安全关闭音频流"""
- try:
- if self._stream:
- if hasattr(self._stream, 'stop'):
- self._stream.stop()
- if hasattr(self._stream, 'close'):
- self._stream.close()
- logger.debug("🔄 音频流已关闭")
- except Exception as e:
- logger.warning(f"⚠️ 关闭音频流异常: {e}")
- finally:
- self._stream = None
- self._is_running = False
- # ----------------- 核心录音逻辑 -----------------
- def _voice_activity_update(self, rms: float):
- """更新语音活动检测状态"""
- with self.voice_activity_lock:
- if rms > self.voice_activity_threshold:
- self.voice_activity_count += 1
- if self.voice_activity_count >= 2:
- self.is_voice_active = True
- else:
- self.voice_activity_count = 0
- self.is_voice_active = False
- def read(self):
- """读取音频流,生成器返回音频块"""
- with self._stream_lock:
- with self.audio_stream() as stream:
- frame_count = 0
- buffer = b''
- while self._is_running and stream:
- try:
- data, _ = stream.read(self.chunk)
- buffer += data
- while len(buffer) >= self.chunk:
- chunk_data, buffer = buffer[:self.chunk], buffer[self.chunk:]
- # 回声消除
- if self.enable_echo_cancellation and self.self_voice_detector:
- if not self.self_voice_detector.process_recording_audio(chunk_data):
- continue
- # RMS 计算 & VAD
- if frame_count % 100 == 0:
- rms = calc_rms(chunk_data)
- self._voice_activity_update(rms)
- logger.debug(
- f"🎚️ RMS: {rms:.2f}, 语音活动={self.is_voice_active}")
- yield chunk_data
- frame_count += 1
- except Exception as e:
- logger.error(f"❌ 音频读取异常: {e}")
- break
- # ----------------- 控制接口 -----------------
- def close(self):
- """完全关闭录音器"""
- with self._stream_lock:
- logger.info("🔄 关闭录音器...")
- self._close_stream()
- logger.info("✅ 录音器已关闭")
- def stop_recording(self):
- """停止录音但保留设备"""
- with self._stream_lock:
- logger.info("🛑 停止录音...")
- self._close_stream()
- def __del__(self):
- try:
- self.close()
- except Exception:
- pass
- # ----------------- 调试入口 -----------------
- if __name__ == "__main__":
- import time
- recorder = Recorder(chunk=640)
- start = time.time()
- for data in recorder.read():
- if time.time() - start > 5:
- break
- recorder.close()
|