message_processor.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. """
  2. 消息处理核心逻辑模块
  3. """
  4. from utils.logger import logger
  5. import struct
  6. import json
  7. from typing import Optional, Tuple
  8. from core.socket_client import SocketClient
  9. from strategies.confirm_process import ConfirmProcess
  10. from strategies.aiui_process import AiuiMessageProcess
  11. from handlers.xunfei.intent_handler import IntentHandler
  12. from handlers.xunfei.speech_handler import XunfeiSpeechHandler
  13. from handlers.xunfei.nlp_handler import NLPHandler
  14. from handlers.xunfei.knowledge_handler import KnowledgeHandler
  15. class XunfeiMessageProcessor:
  16. """消息处理核心类"""
  17. def __init__(self, socket_client: SocketClient):
  18. self.socket_client = socket_client
  19. self.aiui_type = ""
  20. # 初始化策略处理器
  21. self.confirm_process = ConfirmProcess()
  22. self.aiui_process = AiuiMessageProcess()
  23. # 初始化业务处理器
  24. self.intent_handler = IntentHandler()
  25. self.speech_handler = XunfeiSpeechHandler() # 使用讯飞AIUI处理器
  26. self.nlp_handler = NLPHandler()
  27. self.knowledge_handler = KnowledgeHandler()
  28. # 把意图类注册到语音处理器
  29. self.speech_handler.intent_handler = self.intent_handler
  30. # 设置NLP处理器的意图处理器引用
  31. self.nlp_handler.intent_handler = self.intent_handler
  32. # 把iat处理器注册到nlp
  33. self.nlp_handler.iat_handler = self.speech_handler
  34. def process(self) -> bool:
  35. """
  36. 处理接收到的消息
  37. Returns:
  38. bool: 处理是否成功
  39. """
  40. try:
  41. socket = self.socket_client.get_socket()
  42. if not socket:
  43. logger.debug("Socket未连接,等待连接恢复...")
  44. # 不主动触发连接,避免多重连接
  45. return False
  46. # 检查连接状态
  47. if not self.socket_client.is_connected():
  48. logger.debug("Socket连接已断开,等待重连...")
  49. return False
  50. # 设置接收超时 - 增加超时时间,减少频繁失败
  51. socket.settimeout(5) # 从3秒增加到5秒
  52. # 接收消息头
  53. recv_data = self.socket_client.receive_full_data(7)
  54. if not recv_data:
  55. logger.debug("未接收到数据,可能是连接空闲或断开")
  56. # 检查socket是否仍然有效
  57. try:
  58. # 发送一个小的测试数据来检查连接
  59. socket.send(b'\x00')
  60. return True # 连接正常,只是没有数据
  61. except Exception:
  62. logger.debug("Socket连接已断开")
  63. self.socket_client.connected_event.clear()
  64. return False
  65. if len(recv_data) < 7:
  66. logger.error(f"数据不完整: {recv_data}")
  67. return False
  68. # 解析消息头
  69. sync_head, user_id, msg_type, msg_length, msg_id = struct.unpack(
  70. '<BBBHH', recv_data)
  71. # 接收消息体
  72. msg_data = self.socket_client.receive_full_data(msg_length + 1)
  73. if len(msg_data) < msg_length + 1:
  74. logger.error(f"消息体数据不完整: {msg_data}")
  75. return False
  76. # 解析消息数据
  77. msg = msg_data[:msg_length]
  78. check_code = msg_data[-1] # 校验码
  79. # 处理消息
  80. if sync_head == 0xa5 and user_id == 0x01:
  81. return self._handle_message(msg_type, msg_id, msg)
  82. return True
  83. except Exception as e:
  84. logger.error(f"消息处理异常: {e}")
  85. return False
  86. def _handle_message(self, msg_type: int, msg_id: int, msg: bytes) -> bool:
  87. """
  88. 处理具体消息类型
  89. Args:
  90. msg_type: 消息类型
  91. msg_id: 消息ID
  92. msg: 消息内容
  93. Returns:
  94. bool: 处理结果
  95. """
  96. try:
  97. socket = self.socket_client.get_socket()
  98. if msg_type == 0x01:
  99. # 确认消息
  100. self.confirm_process.process(socket, msg_id)
  101. elif msg_type == 0x04:
  102. # AIUI消息
  103. self.confirm_process.process(socket, msg_id)
  104. success, result = self.aiui_process.process(socket, msg)
  105. if success:
  106. self.aiui_type = ""
  107. data = json.loads(result)
  108. self._get_aiui_type(data)
  109. # 处理唤醒事件
  110. if data.get('content', {}).get('eventType', {}) == 4:
  111. logger.info(f"唤醒成功:===={msg_id} 我在 ====")
  112. from utils.tts_client import play_text_async
  113. play_text_async('我在呢,有什么可以帮您的?', use_cache=True)
  114. # 根据AIUI类型处理
  115. if self.aiui_type == "iat":
  116. self.speech_handler.handle_iat_result(data)
  117. elif self.aiui_type == "cbm_knowledge":
  118. logger.info(f"cbm_knowledge: {data}")
  119. self.knowledge_handler.handle_knowledge_result(data)
  120. elif self.aiui_type == "nlp":
  121. self.nlp_handler.handle_nlp_result(data)
  122. elif self.aiui_type == "cbm_semantic":
  123. self.intent_handler.handle_intent_result(data)
  124. else:
  125. logger.warning("AIUI消息处理失败")
  126. return True
  127. except Exception as e:
  128. logger.error(f"消息类型处理异常: {e}")
  129. return False
  130. def _get_aiui_type(self, data: dict) -> None:
  131. """
  132. 获取AIUI类型
  133. Args:
  134. data: AIUI数据
  135. """
  136. if 'content' in data:
  137. content = data['content']
  138. if 'info' in content:
  139. info = content['info']
  140. if 'data' in info and isinstance(info['data'], list) and len(info['data']) > 0:
  141. data_item = info['data'][0]
  142. if 'params' in data_item:
  143. params = data_item['params']
  144. sub_value = params.get('sub')
  145. if sub_value is not None:
  146. self.aiui_type = sub_value