nlp_handler.py 14 KB


  1. """
  2. NLP处理模块
  3. """
  4. import json
  5. import threading
  6. import time
  7. from config.config.dify_config import difyconfig
  8. from handlers.dify.recognize_models import chat_with_dify
  9. from utils.logger import logger
  10. from strategies.vision.qwenv import qwen_nlp
  11. from utils.pc2_requests import _send_qa_task, _send_led_color_task
  12. class NLPHandler:
  13. """NLP处理类"""
  14. def __init__(self):
  15. self.detected_intent = None
  16. self.tts_text = ""
  17. self.accumulated_text = "" # 累积的文本片段
  18. self.is_complete = False # 是否完成
  19. self.intent_handler = None # 意图cbm处理器引用
  20. self.iat_handler = None # IAT处理器引用
  21. def handle_nlp_result(self, data: dict) -> None:
  22. """
  23. 处理NLP结果
  24. Args:
  25. data: NLP数据
  26. """
  27. try:
  28. try:
  29. # 异步调用 _send_led_color_task
  30. led_thread = threading.Thread(
  31. target=_send_led_color_task,
  32. args=(self, "BREATH", "BLUE"),
  33. daemon=True
  34. )
  35. led_thread.start()
  36. except Exception as e:
  37. logger.debug(f"[LED] LED控制失败,不影响NLP处理: {e}")
  38. # logger.info('NLPdata', json.dumps(data, ensure_ascii=False, indent=2))
  39. # 提取 text 字段
  40. text_value = data.get('content', {}).get(
  41. 'result', {}).get('nlp', {}).get('text')
  42. status_value = data.get('content', {}).get(
  43. 'result', {}).get('nlp', {}).get('status')
  44. logger.info(
  45. f"🔍 NLP数据解析: text='{text_value}', status={status_value}")
  46. if text_value is not None and status_value is not None:
  47. logger.info(f"讯飞大模型回答结果是: {text_value} {status_value}")
  48. # 如果是新的对话开始(status为0且accumulated_text为空),重置状态
  49. if status_value == 0 and not self.accumulated_text:
  50. logger.info("🔄 新对话开始,重置状态")
  51. self._reset_state()
  52. ###################################
  53. # 调用dify模型
  54. logger.info(
  55. f"🔍 检查dify配置: switch={difyconfig.get_models_switch()}, current_mode={difyconfig.get_current_mode()}")
  56. if difyconfig.get_models_switch():
  57. logger.info("✅ dify模型已启用")
  58. # nlp所有文字发完了,调用dify模型
  59. if status_value == 2 and difyconfig.get_current_mode() != "xunfei":
  60. logger.info("🚀 调用dify模型处理NLP")
  61. # 从意图处理器获取iat_txt
  62. if self.iat_handler and hasattr(self.iat_handler, 'iat_txt'):
  63. question = self.iat_handler.iat_txt
  64. else:
  65. question = "请帮我分析一下"
  66. # 调用dify模型处理nlp
  67. self._handle_nlp_by_dify(question)
  68. return
  69. else:
  70. logger.info(
  71. f"📝 不调用dify模型: status={status_value}, current_mode={difyconfig.get_current_mode()}")
  72. else:
  73. logger.info("❌ dify模型未启用")
  74. ##################################
  75. logger.info("🎯 准备调用_parse_nlp_result方法")
  76. # 解析NLP结果
  77. self._parse_nlp_result(text_value, status_value)
  78. # 输出处理结果
  79. if self.tts_text:
  80. logger.info(f"成功提取回答: {self.tts_text}")
  81. else:
  82. logger.info("未成功提取回答")
  83. if self.detected_intent:
  84. logger.info(f"成功提取意图: {self.detected_intent}")
  85. self._handle_detected_intent(self.detected_intent)
  86. else:
  87. logger.info("未检测到预设动作指令意图")
  88. else:
  89. logger.warning("NLP数据不完整")
  90. except Exception as e:
  91. logger.error(f"NLP处理异常: {e}")
  92. import traceback
  93. logger.error(f"异常堆栈: {traceback.format_exc()}")
  94. def _parse_nlp_result(self, text_value: str, status_value: int) -> None:
  95. """
  96. 解析NLP结果并合并文本片段
  97. Args:
  98. text_value: NLP文本结果
  99. status_value: 状态值(0:继续, 1:结束, 2:完成)
  100. """
  101. try:
  102. # 检查是否完成(status为2表示最终完成)
  103. if status_value == 2:
  104. logger.info(f"🎯 检测到结束状态,当前累积文本: '{self.accumulated_text}'")
  105. # 即使文本为空,也要处理结束逻辑
  106. if text_value and len(text_value.strip()) > 0:
  107. # 累积文本片段(包括标点符号)
  108. self.accumulated_text += text_value.strip()
  109. logger.debug(f"累积文本: {self.accumulated_text}")
  110. # 验证累积文本的有效性
  111. if len(self.accumulated_text.strip()) < 1:
  112. logger.warning(f"累积文本过短,可能无效: '{self.accumulated_text}'")
  113. # 重置状态,不处理无效结果
  114. self._reset_state()
  115. return
  116. self.tts_text = self.accumulated_text.strip()
  117. self.is_complete = True
  118. logger.info(f"🎉 回答完成,合并结果: {self.tts_text}")
  119. # 去除所有空格
  120. # self.tts_text = self.tts_text.replace(" ", "")
  121. # logger.info(f"🔧 去除空格后: {self.tts_text}")
  122. # 定义可能的触发词组(讯飞回答不出,qwen补充回答)
  123. error_phrases = [
  124. "对不起",
  125. "没有明确的含义",
  126. "上下文",
  127. "无法理解你的问题",
  128. "无法理解",
  129. "无法提供",
  130. "无法回答",
  131. "没有明确",
  132. "没有理解",
  133. "没有理解你的问题",
  134. "没有理解你的意图",
  135. "没有理解你的需求",
  136. "没有理解你的请求",
  137. "没有理解你的问题",
  138. "没有明确的解释",
  139. "没有明确的含义"
  140. ]
  141. # 检查是否包含任一词组
  142. if any(phrase in self.tts_text for phrase in error_phrases):
  143. logger.info(f"🚨 检测到错误词组,调用千问模型")
  144. # 从IAT处理器获取iat_txt
  145. if self.iat_handler and hasattr(self.iat_handler, 'iat_txt'):
  146. question = self.iat_handler.iat_txt
  147. else:
  148. question = "请帮我分析一下"
  149. answer_text = qwen_nlp(question=question)
  150. logger.info('千问大模型回答', answer_text)
  151. self.tts_text = answer_text
  152. # 检查是否已处理cbm_semantic,避免重复发送
  153. if not (self.intent_handler and self.intent_handler.is_cbm_semantic_processed()):
  154. logger.info("📤 准备发送QA任务")
  155. # 异步调用 _send_qa_task,添加异常处理
  156. try:
  157. qa_thread = threading.Thread(
  158. target=_send_qa_task,
  159. args=(self, {"result": self.tts_text}),
  160. daemon=True
  161. )
  162. qa_thread.start()
  163. logger.info("✅ QA任务已启动")
  164. except Exception as e:
  165. logger.debug(f"[QA] QA请求失败,不影响NLP处理: {e}")
  166. else:
  167. logger.info("检测到已处理cbm_semantic,跳过QA发送")
  168. logger.info(f"🎵 准备调用播放方法,TTS文本: '{self.tts_text}'")
  169. # 智能分割并播放完整文本
  170. self._play_complete_text()
  171. # 重置累积状态
  172. self._reset_state()
  173. return
  174. # 非结束状态的处理
  175. # 验证文本有效性
  176. if not text_value or len(text_value.strip()) == 0:
  177. logger.debug("文本为空,跳过处理")
  178. return
  179. # 检查是否只是标点符号
  180. stripped_text = text_value.strip()
  181. if len(stripped_text) == 1 and stripped_text in '。,!?;:""''()【】':
  182. logger.debug(f"跳过纯标点符号: {stripped_text}")
  183. return
  184. # 累积文本片段
  185. self.accumulated_text += stripped_text
  186. logger.debug(f"累积文本: {self.accumulated_text}")
  187. logger.debug(f"回答未完成,继续等待: {self.accumulated_text}")
  188. except Exception as e:
  189. logger.error(f"NLP结果解析异常: {e}")
  190. # 异常情况下也重置状态
  191. self._reset_state()
  192. def _play_complete_text(self) -> None:
  193. """播放完整的合并文本"""
  194. try:
  195. logger.info(f"🎯 开始播放完整文本,TTS文本: '{self.tts_text}'")
  196. if not self.tts_text:
  197. logger.warning("⚠️ TTS文本为空,无法播放")
  198. return
  199. # 检查是否已处理cbm_semantic,避免重复播放
  200. if self.intent_handler and self.intent_handler.is_cbm_semantic_processed():
  201. logger.info("检测到已处理cbm_semantic,跳过NLP播放")
  202. # 重置cbm_semantic处理标记,为下次处理做准备
  203. self.intent_handler.reset_cbm_semantic_processed()
  204. return
  205. logger.info(f"🎵 准备播放文本: {self.tts_text}")
  206. # 直接播放完整文本,不进行分割
  207. from utils.tts_client import play_text_async
  208. play_text_async(self.tts_text, use_cache=True)
  209. logger.info(f"✅ 已调用TTS播放: {self.tts_text}")
  210. # 播放完成后立即重置状态,防止重复处理
  211. self.tts_text = ""
  212. self.is_complete = False
  213. except Exception as e:
  214. logger.error(f"❌ 完整文本播放失败: {e}")
  215. # 降级处理:如果播放失败,记录错误
  216. logger.error(f"TTS播放失败,无法播放文本: {self.tts_text}")
  217. # 异常情况下也重置状态
  218. self.tts_text = ""
  219. self.is_complete = False
  220. def _handle_nlp_by_dify(self, question: str) -> None:
  221. """
  222. 解析NLP结果并合并文本片段
  223. Args:
  224. text_value: NLP文本结果
  225. status_value: 状态值(0:继续, 1:结束, 2:完成)
  226. """
  227. try:
  228. self.is_complete = True
  229. # 调用dify模型回答
  230. print("dify模型问题::::::::::::::::::::::", question)
  231. answer_text = chat_with_dify(question=question)
  232. print("dify模型回答::::::::::::::::::::::", answer_text)
  233. self.tts_text = answer_text
  234. # 检查是否已处理cbm_semantic,避免重复发送
  235. if not self.intent_handler.is_cbm_semantic_processed():
  236. # 异步调用 _send_qa_task,添加异常处理
  237. try:
  238. qa_thread = threading.Thread(
  239. target=_send_qa_task,
  240. args=(self, {"result": self.tts_text}),
  241. daemon=True
  242. )
  243. qa_thread.start()
  244. except Exception as e:
  245. logger.debug(f"[QA] QA请求失败,不影响NLP处理: {e}")
  246. else:
  247. logger.info("检测到已处理cbm_semantic,跳过QA发送")
  248. # 智能分割并播放完整文本
  249. self._play_complete_text()
  250. # 重置累积状态
  251. self.accumulated_text = ""
  252. self.is_complete = False
  253. except Exception as e:
  254. logger.error(f"NLP结果解析异常: {e}")
  255. def _reset_state(self) -> None:
  256. """重置状态,准备处理新的对话"""
  257. self.tts_text = ""
  258. self.accumulated_text = ""
  259. self.is_complete = False
  260. self.detected_intent = None
  261. # 重置语音处理器的语气助词状态
  262. try:
  263. from handlers.speech_handler import SpeechHandler
  264. # 这里我们需要通过消息处理器来重置语音处理器状态
  265. # 由于模块间依赖关系,我们通过日志提示需要重置
  266. logger.debug("需要重置语音处理器语气助词状态")
  267. except Exception as e:
  268. logger.debug(f"重置语音处理器状态时出现异常: {e}")
  269. logger.debug("已重置NLP处理器状态,准备处理新对话")
  270. def _handle_detected_intent(self, intent: str) -> None:
  271. """
  272. 处理检测到的意图
  273. Args:
  274. intent: 意图名称
  275. """
  276. if intent == "hi":
  277. logger.info(f"检测到 [{intent}] 意图, 执行打招呼动作")
  278. elif intent == "hand":
  279. logger.info(f"检测到 [{intent}] 意图, 执行握手动作")
  280. elif intent == "tour":
  281. logger.info(f"检测到 [{intent}] 意图, 执行实验室游览动作")
  282. elif intent == "Bow":
  283. logger.info(f"检测到 [{intent}] 意图, 执行鞠躬欢送动作")
  284. elif intent == "Nod":
  285. logger.info(f"检测到 [{intent}] 意图, 执行点头动作")
  286. def get_detected_intent(self) -> str:
  287. """获取检测到的意图"""
  288. return self.detected_intent or ""
  289. def get_tts_text(self) -> str:
  290. """获取TTS文本"""
  291. return self.tts_text or ""