""" NLP处理模块 """ import json import threading import time from config.config.dify_config import difyconfig from handlers.dify.recognize_models import chat_with_dify from utils.logger import logger from strategies.vision.qwenv import qwen_nlp from utils.pc2_requests import _send_qa_task, _send_led_color_task class NLPHandler: """NLP处理类""" def __init__(self): self.detected_intent = None self.tts_text = "" self.accumulated_text = "" # 累积的文本片段 self.is_complete = False # 是否完成 self.intent_handler = None # 意图cbm处理器引用 self.iat_handler = None # IAT处理器引用 def handle_nlp_result(self, data: dict) -> None: """ 处理NLP结果 Args: data: NLP数据 """ try: try: # 异步调用 _send_led_color_task led_thread = threading.Thread( target=_send_led_color_task, args=(self, "BREATH", "BLUE"), daemon=True ) led_thread.start() except Exception as e: logger.debug(f"[LED] LED控制失败,不影响NLP处理: {e}") # logger.info('NLPdata', json.dumps(data, ensure_ascii=False, indent=2)) # 提取 text 字段 text_value = data.get('content', {}).get( 'result', {}).get('nlp', {}).get('text') status_value = data.get('content', {}).get( 'result', {}).get('nlp', {}).get('status') logger.info( f"🔍 NLP数据解析: text='{text_value}', status={status_value}") if text_value is not None and status_value is not None: logger.info(f"讯飞大模型回答结果是: {text_value} {status_value}") # 如果是新的对话开始(status为0且accumulated_text为空),重置状态 if status_value == 0 and not self.accumulated_text: logger.info("🔄 新对话开始,重置状态") self._reset_state() ################################### # 调用dify模型 logger.info( f"🔍 检查dify配置: switch={difyconfig.get_models_switch()}, current_mode={difyconfig.get_current_mode()}") if difyconfig.get_models_switch(): logger.info("✅ dify模型已启用") # nlp所有文字发完了,调用dify模型 if status_value == 2 and difyconfig.get_current_mode() != "xunfei": logger.info("🚀 调用dify模型处理NLP") # 从意图处理器获取iat_txt if self.iat_handler and hasattr(self.iat_handler, 'iat_txt'): question = self.iat_handler.iat_txt else: question = "请帮我分析一下" # 调用dify模型处理nlp self._handle_nlp_by_dify(question) return else: logger.info( f"📝 不调用dify模型: status={status_value}, current_mode={difyconfig.get_current_mode()}") else: logger.info("❌ dify模型未启用") ################################## logger.info("🎯 准备调用_parse_nlp_result方法") # 解析NLP结果 self._parse_nlp_result(text_value, status_value) # 输出处理结果 if self.tts_text: logger.info(f"成功提取回答: {self.tts_text}") else: logger.info("未成功提取回答") if self.detected_intent: logger.info(f"成功提取意图: {self.detected_intent}") self._handle_detected_intent(self.detected_intent) else: logger.info("未检测到预设动作指令意图") else: logger.warning("NLP数据不完整") except Exception as e: logger.error(f"NLP处理异常: {e}") import traceback logger.error(f"异常堆栈: {traceback.format_exc()}") def _parse_nlp_result(self, text_value: str, status_value: int) -> None: """ 解析NLP结果并合并文本片段 Args: text_value: NLP文本结果 status_value: 状态值(0:继续, 1:结束, 2:完成) """ try: # 检查是否完成(status为2表示最终完成) if status_value == 2: logger.info(f"🎯 检测到结束状态,当前累积文本: '{self.accumulated_text}'") # 即使文本为空,也要处理结束逻辑 if text_value and len(text_value.strip()) > 0: # 累积文本片段(包括标点符号) self.accumulated_text += text_value.strip() logger.debug(f"累积文本: {self.accumulated_text}") # 验证累积文本的有效性 if len(self.accumulated_text.strip()) < 1: logger.warning(f"累积文本过短,可能无效: '{self.accumulated_text}'") # 重置状态,不处理无效结果 self._reset_state() return self.tts_text = self.accumulated_text.strip() self.is_complete = True logger.info(f"🎉 回答完成,合并结果: {self.tts_text}") # 去除所有空格 # self.tts_text = self.tts_text.replace(" ", "") # logger.info(f"🔧 去除空格后: {self.tts_text}") # 定义可能的触发词组(讯飞回答不出,qwen补充回答) error_phrases = [ "对不起", "没有明确的含义", "上下文", "无法理解你的问题", "无法理解", "无法提供", "无法回答", "没有明确", "没有理解", "没有理解你的问题", "没有理解你的意图", "没有理解你的需求", "没有理解你的请求", "没有理解你的问题", "没有明确的解释", "没有明确的含义" ] # 检查是否包含任一词组 if any(phrase in self.tts_text for phrase in error_phrases): logger.info(f"🚨 检测到错误词组,调用千问模型") # 从IAT处理器获取iat_txt if self.iat_handler and hasattr(self.iat_handler, 'iat_txt'): question = self.iat_handler.iat_txt else: question = "请帮我分析一下" answer_text = qwen_nlp(question=question) logger.info('千问大模型回答', answer_text) self.tts_text = answer_text # 检查是否已处理cbm_semantic,避免重复发送 if not (self.intent_handler and self.intent_handler.is_cbm_semantic_processed()): logger.info("📤 准备发送QA任务") # 异步调用 _send_qa_task,添加异常处理 try: qa_thread = threading.Thread( target=_send_qa_task, args=(self, {"result": self.tts_text}), daemon=True ) qa_thread.start() logger.info("✅ QA任务已启动") except Exception as e: logger.debug(f"[QA] QA请求失败,不影响NLP处理: {e}") else: logger.info("检测到已处理cbm_semantic,跳过QA发送") logger.info(f"🎵 准备调用播放方法,TTS文本: '{self.tts_text}'") # 智能分割并播放完整文本 self._play_complete_text() # 重置累积状态 self._reset_state() return # 非结束状态的处理 # 验证文本有效性 if not text_value or len(text_value.strip()) == 0: logger.debug("文本为空,跳过处理") return # 检查是否只是标点符号 stripped_text = text_value.strip() if len(stripped_text) == 1 and stripped_text in '。,!?;:""''()【】': logger.debug(f"跳过纯标点符号: {stripped_text}") return # 累积文本片段 self.accumulated_text += stripped_text logger.debug(f"累积文本: {self.accumulated_text}") logger.debug(f"回答未完成,继续等待: {self.accumulated_text}") except Exception as e: logger.error(f"NLP结果解析异常: {e}") # 异常情况下也重置状态 self._reset_state() def _play_complete_text(self) -> None: """播放完整的合并文本""" try: logger.info(f"🎯 开始播放完整文本,TTS文本: '{self.tts_text}'") if not self.tts_text: logger.warning("⚠️ TTS文本为空,无法播放") return # 检查是否已处理cbm_semantic,避免重复播放 if self.intent_handler and self.intent_handler.is_cbm_semantic_processed(): logger.info("检测到已处理cbm_semantic,跳过NLP播放") # 重置cbm_semantic处理标记,为下次处理做准备 self.intent_handler.reset_cbm_semantic_processed() return logger.info(f"🎵 准备播放文本: {self.tts_text}") # 直接播放完整文本,不进行分割 from utils.tts_client import play_text_async play_text_async(self.tts_text, use_cache=True) logger.info(f"✅ 已调用TTS播放: {self.tts_text}") # 播放完成后立即重置状态,防止重复处理 self.tts_text = "" self.is_complete = False except Exception as e: logger.error(f"❌ 完整文本播放失败: {e}") # 降级处理:如果播放失败,记录错误 logger.error(f"TTS播放失败,无法播放文本: {self.tts_text}") # 异常情况下也重置状态 self.tts_text = "" self.is_complete = False def _handle_nlp_by_dify(self, question: str) -> None: """ 解析NLP结果并合并文本片段 Args: text_value: NLP文本结果 status_value: 状态值(0:继续, 1:结束, 2:完成) """ try: self.is_complete = True # 调用dify模型回答 print("dify模型问题::::::::::::::::::::::", question) answer_text = chat_with_dify(question=question) print("dify模型回答::::::::::::::::::::::", answer_text) self.tts_text = answer_text # 检查是否已处理cbm_semantic,避免重复发送 if not self.intent_handler.is_cbm_semantic_processed(): # 异步调用 _send_qa_task,添加异常处理 try: qa_thread = threading.Thread( target=_send_qa_task, args=(self, {"result": self.tts_text}), daemon=True ) qa_thread.start() except Exception as e: logger.debug(f"[QA] QA请求失败,不影响NLP处理: {e}") else: logger.info("检测到已处理cbm_semantic,跳过QA发送") # 智能分割并播放完整文本 self._play_complete_text() # 重置累积状态 self.accumulated_text = "" self.is_complete = False except Exception as e: logger.error(f"NLP结果解析异常: {e}") def _reset_state(self) -> None: """重置状态,准备处理新的对话""" self.tts_text = "" self.accumulated_text = "" self.is_complete = False self.detected_intent = None # 重置语音处理器的语气助词状态 try: from handlers.speech_handler import SpeechHandler # 这里我们需要通过消息处理器来重置语音处理器状态 # 由于模块间依赖关系,我们通过日志提示需要重置 logger.debug("需要重置语音处理器语气助词状态") except Exception as e: logger.debug(f"重置语音处理器状态时出现异常: {e}") logger.debug("已重置NLP处理器状态,准备处理新对话") def _handle_detected_intent(self, intent: str) -> None: """ 处理检测到的意图 Args: intent: 意图名称 """ if intent == "hi": logger.info(f"检测到 [{intent}] 意图, 执行打招呼动作") elif intent == "hand": logger.info(f"检测到 [{intent}] 意图, 执行握手动作") elif intent == "tour": logger.info(f"检测到 [{intent}] 意图, 执行实验室游览动作") elif intent == "Bow": logger.info(f"检测到 [{intent}] 意图, 执行鞠躬欢送动作") elif intent == "Nod": logger.info(f"检测到 [{intent}] 意图, 执行点头动作") def get_detected_intent(self) -> str: """获取检测到的意图""" return self.detected_intent or "" def get_tts_text(self) -> str: """获取TTS文本""" return self.tts_text or ""