| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- """
- 机器人AI语音识别主程序入口
- Author: zhaoyong 77912776@qq.com
- Date: 2025-08-19
- LastEditTime: 2025-08-24
- """
- import time
- from datetime import datetime
- import signal
- from typing import Optional
- from utils.load_config import load_config
- from utils.logger import setup_logger, logger
- from handlers.aiui.EventListener import EventListener
- from handlers.aiui.AIui_node import AIUINode
- from handlers.baidu.speech_handler import BaiduSpeechHandler
- from handlers.speech_handler import SpeechHandler
- from core.message_processor import MessageProcessor
- from core.socket_client import SocketClient
- from utils.init_system import initialize_robot_system
- import sys
- class RobotAI:
- """机器人AI主类"""
- def __init__(self):
- self.run = True
- self.socket_client: Optional[SocketClient] = None
- self.message_processor: Optional[MessageProcessor] = None
- self.speech_handler: Optional[SpeechHandler] = None
- self.baidu_handler: Optional[BaiduSpeechHandler] = None
- self.xunfei_linux_node: Optional[AIUINode] = None
- signal.signal(signal.SIGINT, self._stop_handler)
- setup_logger('robot_ai', 'logs')
- self.config = load_config()
- self.speech_config = self.config.get('speech_recognition', {})
- self.service_type = self.speech_config.get('service', 'xunfei_aiui')
- self.is_xunfei_linux = self.service_type == 'xunfei_linux'
- self._init_components()
- def _stop_handler(self, signum, frame):
- """信号处理函数"""
- logger.info(f"收到信号 {signum},正在停止...")
- self.run = False
- def _init_components(self):
- """初始化语音识别组件"""
- logger.info("正在初始化机器人AI语音识别组件...")
- try:
- if self.service_type == 'baidu_realtime':
- self._init_baidu()
- elif self.is_xunfei_linux:
- self._init_xunfei_linux()
- else:
- self._init_xunfei_aiui()
- logger.info("机器人AI语音识别初始化完成")
- except Exception as e:
- logger.error(f"系统初始化失败: {e}")
- raise
- def _init_baidu(self):
- """初始化百度语音识别"""
- logger.info("使用百度实时语音识别服务")
- self.baidu_handler = BaiduSpeechHandler()
- if not self.baidu_handler.start_recognition():
- raise RuntimeError("百度实时语音识别启动失败")
- logger.info("百度实时语音识别启动成功")
- def _init_xunfei_linux(self):
- """初始化讯飞 Linux SDK"""
- logger.info("使用讯飞 Linux SDK 语音识别服务")
- event_listener = EventListener(skills_dict={}, debug=False)
- self.xunfei_linux_node = AIUINode(event_listener, debug=False)
- logger.info("讯飞 Linux SDK 初始化成功")
- def _init_xunfei_aiui(self):
- """初始化讯飞 AIUI"""
- logger.info("使用讯飞 AIUI 语音识别服务")
- self.speech_handler = SpeechHandler()
- self.socket_client = SocketClient()
- self.message_processor = MessageProcessor(self.socket_client)
- def start(self):
- """启动系统"""
- logger.info("机器人AI语音识别启动")
- try:
- if self.service_type == 'baidu_realtime':
- self._run_baidu()
- elif self.is_xunfei_linux:
- self._run_xunfei_linux()
- else:
- self._run_xunfei_aiui()
- except Exception as e:
- logger.error(f"系统运行异常: {e}")
- finally:
- self.stop()
- def _run_baidu(self):
- logger.info("百度实时语音识别服务运行中...")
- while self.run:
- time.sleep(1)
- def _run_xunfei_linux(self):
- logger.info("讯飞 Linux SDK 服务运行中...")
- if self.xunfei_linux_node:
- self.xunfei_linux_node.start(spin=True)
- def _run_xunfei_aiui(self):
- logger.info("讯飞 AIUI 服务运行中...")
- while self.run and self.message_processor:
- self.message_processor.process()
- def stop(self):
- """停止系统"""
- logger.info("正在停止机器人AI语音识别...")
- try:
- if self.baidu_handler and self.service_type == 'baidu_realtime':
- self.baidu_handler.stop_recognition()
- if self.xunfei_linux_node and self.is_xunfei_linux:
- self.xunfei_linux_node.shutdown()
- if self.socket_client:
- self.socket_client.close()
- logger.info("机器人AI语音识别已停止")
- except Exception as e:
- logger.error(f"停止过程中出现异常: {e}")
- def main():
- """主函数"""
- logger.info("=== 机器人AI主程序启动 ===")
- logger.info(f"启动时间: {datetime.now()}")
- try:
- # 先进行系统级初始化(音频/组件等)
- initialize_robot_system()
- robot = RobotAI()
- robot.start()
- logger.info("机器人AI主程序正常退出")
- return 0
- except KeyboardInterrupt:
- logger.info("收到键盘中断信号,程序退出")
- return 0
- except Exception as e:
- import traceback
- logger.error(f"程序启动失败: {e}")
- logger.error(f"错误详情: {traceback.format_exc()}")
- return 1
- if __name__ == '__main__':
- sys.exit(main())
|