message_processor.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. """
  2. 消息处理核心逻辑模块
  3. """
  4. from utils.logger import logger
  5. import struct
  6. import json
  7. from typing import Optional, Tuple
  8. from core.socket_client import SocketClient
  9. from strategies.confirm_process import ConfirmProcess
  10. from strategies.aiui_process import AiuiMessageProcess
  11. from handlers.xunfei.intent_handler import IntentHandler
  12. from handlers.xunfei.speech_handler import XunfeiSpeechHandler
  13. from handlers.xunfei.nlp_handler import NLPHandler
  14. from handlers.xunfei.knowledge_handler import KnowledgeHandler
  15. class MessageProcessor:
  16. """消息处理核心类"""
  17. def __init__(self, socket_client: SocketClient):
  18. self.socket_client = socket_client
  19. self.aiui_type = ""
  20. # 初始化策略处理器
  21. self.confirm_process = ConfirmProcess()
  22. self.aiui_process = AiuiMessageProcess()
  23. # 初始化业务处理器
  24. self.intent_handler = IntentHandler()
  25. self.speech_handler = XunfeiSpeechHandler() # 使用讯飞AIUI处理器
  26. self.nlp_handler = NLPHandler()
  27. self.knowledge_handler = KnowledgeHandler()
  28. # 把意图类注册到语音处理器
  29. self.speech_handler.intent_handler = self.intent_handler
  30. # 设置NLP处理器的意图处理器引用
  31. self.nlp_handler.intent_handler = self.intent_handler
  32. # 设置NLP处理器的IAT处理器引用
  33. self.nlp_handler.iat_handler = self.speech_handler
  34. def process(self) -> bool:
  35. """
  36. 处理接收到的消息
  37. Returns:
  38. bool: 处理是否成功
  39. """
  40. try:
  41. socket = self.socket_client.get_socket()
  42. if not socket:
  43. logger.error("Socket未连接,重新连接...")
  44. self.socket_client.connected_event.clear()
  45. self.socket_client.connect()
  46. return False
  47. # 设置接收超时
  48. socket.settimeout(3)
  49. # 接收消息头
  50. recv_data = self.socket_client.receive_full_data(7)
  51. if not recv_data:
  52. # logger.error("未接收到数据,重新连接...")
  53. self.socket_client.connected_event.clear()
  54. self.socket_client.connect()
  55. return False
  56. if len(recv_data) < 7:
  57. logger.error(f"数据不完整: {recv_data}")
  58. return False
  59. # 解析消息头
  60. sync_head, user_id, msg_type, msg_length, msg_id = struct.unpack(
  61. '<BBBHH', recv_data)
  62. # 接收消息体
  63. msg_data = self.socket_client.receive_full_data(msg_length + 1)
  64. if len(msg_data) < msg_length + 1:
  65. logger.error(f"消息体数据不完整: {msg_data}")
  66. return False
  67. # 解析消息数据
  68. msg = msg_data[:msg_length]
  69. check_code = msg_data[-1] # 校验码
  70. # 处理消息
  71. if sync_head == 0xa5 and user_id == 0x01:
  72. return self._handle_message(msg_type, msg_id, msg)
  73. return True
  74. except Exception as e:
  75. logger.error(f"消息处理异常: {e}")
  76. return False
  77. def _handle_message(self, msg_type: int, msg_id: int, msg: bytes) -> bool:
  78. """
  79. 处理具体消息类型
  80. Args:
  81. msg_type: 消息类型
  82. msg_id: 消息ID
  83. msg: 消息内容
  84. Returns:
  85. bool: 处理结果
  86. """
  87. try:
  88. socket = self.socket_client.get_socket()
  89. if msg_type == 0x01:
  90. # 确认消息
  91. self.confirm_process.process(socket, msg_id)
  92. elif msg_type == 0x04:
  93. # AIUI消息
  94. self.confirm_process.process(socket, msg_id)
  95. success, result = self.aiui_process.process(socket, msg)
  96. if success:
  97. self.aiui_type = ""
  98. data = json.loads(result)
  99. self._get_aiui_type(data)
  100. # 处理唤醒事件
  101. if data.get('content', {}).get('eventType', {}) == 4:
  102. logger.info(f"唤醒成功:===={msg_id} 我在 ====")
  103. from utils.tts_client import play_text_async
  104. play_text_async('我在呢,有什么可以帮您的?', use_cache=True)
  105. # 根据AIUI类型处理
  106. if self.aiui_type == "iat":
  107. self.speech_handler.handle_iat_result(data)
  108. elif self.aiui_type == "nlp":
  109. self.nlp_handler.handle_nlp_result(data)
  110. elif self.aiui_type == "cbm_semantic":
  111. self.intent_handler.handle_intent_result(data)
  112. elif self.aiui_type == "cbm_knowledge":
  113. self.knowledge_handler.handle_knowledge_result(data)
  114. else:
  115. logger.warning("AIUI消息处理失败")
  116. return True
  117. except Exception as e:
  118. logger.error(f"消息类型处理异常: {e}")
  119. return False
  120. def _get_aiui_type(self, data: dict) -> None:
  121. """
  122. 获取AIUI类型
  123. Args:
  124. data: AIUI数据
  125. """
  126. if 'content' in data:
  127. content = data['content']
  128. if 'info' in content:
  129. info = content['info']
  130. if 'data' in info and isinstance(info['data'], list) and len(info['data']) > 0:
  131. data_item = info['data'][0]
  132. if 'params' in data_item:
  133. params = data_item['params']
  134. sub_value = params.get('sub')
  135. if sub_value is not None:
  136. self.aiui_type = sub_value