| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339 |
- """
- NLP处理模块
- """
- import json
- import threading
- import time
- from config.config.dify_config import difyconfig
- from handlers.dify.recognize_models import chat_with_dify
- from utils.logger import logger
- from strategies.vision.qwenv import qwen_nlp
- from utils.pc2_requests import _send_qa_task, _send_led_color_task
- class NLPHandler:
- """NLP处理类"""
- def __init__(self):
- self.detected_intent = None
- self.tts_text = ""
- self.accumulated_text = "" # 累积的文本片段
- self.is_complete = False # 是否完成
- self.intent_handler = None # 意图cbm处理器引用
- self.iat_handler = None # IAT处理器引用
- def handle_nlp_result(self, data: dict) -> None:
- """
- 处理NLP结果
- Args:
- data: NLP数据
- """
- try:
- try:
- # 异步调用 _send_led_color_task
- led_thread = threading.Thread(
- target=_send_led_color_task,
- args=(self, "BREATH", "BLUE"),
- daemon=True
- )
- led_thread.start()
- except Exception as e:
- logger.debug(f"[LED] LED控制失败,不影响NLP处理: {e}")
- # logger.info('NLPdata', json.dumps(data, ensure_ascii=False, indent=2))
- # 提取 text 字段
- text_value = data.get('content', {}).get(
- 'result', {}).get('nlp', {}).get('text')
- status_value = data.get('content', {}).get(
- 'result', {}).get('nlp', {}).get('status')
- logger.info(
- f"🔍 NLP数据解析: text='{text_value}', status={status_value}")
- if text_value is not None and status_value is not None:
- logger.info(f"讯飞大模型回答结果是: {text_value} {status_value}")
- # 如果是新的对话开始(status为0且accumulated_text为空),重置状态
- if status_value == 0 and not self.accumulated_text:
- logger.info("🔄 新对话开始,重置状态")
- self._reset_state()
- ###################################
- # 调用dify模型
- logger.info(
- f"🔍 检查dify配置: switch={difyconfig.get_models_switch()}, current_mode={difyconfig.get_current_mode()}")
- if difyconfig.get_models_switch():
- logger.info("✅ dify模型已启用")
- # nlp所有文字发完了,调用dify模型
- if status_value == 2 and difyconfig.get_current_mode() != "xunfei":
- logger.info("🚀 调用dify模型处理NLP")
- # 从意图处理器获取iat_txt
- if self.iat_handler and hasattr(self.iat_handler, 'iat_txt'):
- question = self.iat_handler.iat_txt
- else:
- question = "请帮我分析一下"
- # 调用dify模型处理nlp
- self._handle_nlp_by_dify(question)
- return
- else:
- logger.info(
- f"📝 不调用dify模型: status={status_value}, current_mode={difyconfig.get_current_mode()}")
- else:
- logger.info("❌ dify模型未启用")
- ##################################
- logger.info("🎯 准备调用_parse_nlp_result方法")
- # 解析NLP结果
- self._parse_nlp_result(text_value, status_value)
- # 输出处理结果
- if self.tts_text:
- logger.info(f"成功提取回答: {self.tts_text}")
- else:
- logger.info("未成功提取回答")
- if self.detected_intent:
- logger.info(f"成功提取意图: {self.detected_intent}")
- self._handle_detected_intent(self.detected_intent)
- else:
- logger.info("未检测到预设动作指令意图")
- else:
- logger.warning("NLP数据不完整")
- except Exception as e:
- logger.error(f"NLP处理异常: {e}")
- import traceback
- logger.error(f"异常堆栈: {traceback.format_exc()}")
- def _parse_nlp_result(self, text_value: str, status_value: int) -> None:
- """
- 解析NLP结果并合并文本片段
- Args:
- text_value: NLP文本结果
- status_value: 状态值(0:继续, 1:结束, 2:完成)
- """
- try:
- # 检查是否完成(status为2表示最终完成)
- if status_value == 2:
- logger.info(f"🎯 检测到结束状态,当前累积文本: '{self.accumulated_text}'")
- # 即使文本为空,也要处理结束逻辑
- if text_value and len(text_value.strip()) > 0:
- # 累积文本片段(包括标点符号)
- self.accumulated_text += text_value.strip()
- logger.debug(f"累积文本: {self.accumulated_text}")
- # 验证累积文本的有效性
- if len(self.accumulated_text.strip()) < 1:
- logger.warning(f"累积文本过短,可能无效: '{self.accumulated_text}'")
- # 重置状态,不处理无效结果
- self._reset_state()
- return
- self.tts_text = self.accumulated_text.strip()
- self.is_complete = True
- logger.info(f"🎉 回答完成,合并结果: {self.tts_text}")
- # 去除所有空格
- # self.tts_text = self.tts_text.replace(" ", "")
- # logger.info(f"🔧 去除空格后: {self.tts_text}")
- # 定义可能的触发词组(讯飞回答不出,qwen补充回答)
- error_phrases = [
- "对不起",
- "没有明确的含义",
- "上下文",
- "无法理解你的问题",
- "无法理解",
- "无法提供",
- "无法回答",
- "没有明确",
- "没有理解",
- "没有理解你的问题",
- "没有理解你的意图",
- "没有理解你的需求",
- "没有理解你的请求",
- "没有理解你的问题",
- "没有明确的解释",
- "没有明确的含义"
- ]
- # 检查是否包含任一词组
- if any(phrase in self.tts_text for phrase in error_phrases):
- logger.info(f"🚨 检测到错误词组,调用千问模型")
- # 从IAT处理器获取iat_txt
- if self.iat_handler and hasattr(self.iat_handler, 'iat_txt'):
- question = self.iat_handler.iat_txt
- else:
- question = "请帮我分析一下"
- answer_text = qwen_nlp(question=question)
- logger.info('千问大模型回答', answer_text)
- self.tts_text = answer_text
- # 检查是否已处理cbm_semantic,避免重复发送
- if not (self.intent_handler and self.intent_handler.is_cbm_semantic_processed()):
- logger.info("📤 准备发送QA任务")
- # 异步调用 _send_qa_task,添加异常处理
- try:
- qa_thread = threading.Thread(
- target=_send_qa_task,
- args=(self, {"result": self.tts_text}),
- daemon=True
- )
- qa_thread.start()
- logger.info("✅ QA任务已启动")
- except Exception as e:
- logger.debug(f"[QA] QA请求失败,不影响NLP处理: {e}")
- else:
- logger.info("检测到已处理cbm_semantic,跳过QA发送")
- logger.info(f"🎵 准备调用播放方法,TTS文本: '{self.tts_text}'")
- # 智能分割并播放完整文本
- self._play_complete_text()
- # 重置累积状态
- self._reset_state()
- return
- # 非结束状态的处理
- # 验证文本有效性
- if not text_value or len(text_value.strip()) == 0:
- logger.debug("文本为空,跳过处理")
- return
- # 检查是否只是标点符号
- stripped_text = text_value.strip()
- if len(stripped_text) == 1 and stripped_text in '。,!?;:""''()【】':
- logger.debug(f"跳过纯标点符号: {stripped_text}")
- return
- # 累积文本片段
- self.accumulated_text += stripped_text
- logger.debug(f"累积文本: {self.accumulated_text}")
- logger.debug(f"回答未完成,继续等待: {self.accumulated_text}")
- except Exception as e:
- logger.error(f"NLP结果解析异常: {e}")
- # 异常情况下也重置状态
- self._reset_state()
- def _play_complete_text(self) -> None:
- """播放完整的合并文本"""
- try:
- logger.info(f"🎯 开始播放完整文本,TTS文本: '{self.tts_text}'")
- if not self.tts_text:
- logger.warning("⚠️ TTS文本为空,无法播放")
- return
- # 检查是否已处理cbm_semantic,避免重复播放
- if self.intent_handler and self.intent_handler.is_cbm_semantic_processed():
- logger.info("检测到已处理cbm_semantic,跳过NLP播放")
- # 重置cbm_semantic处理标记,为下次处理做准备
- self.intent_handler.reset_cbm_semantic_processed()
- return
- logger.info(f"🎵 准备播放文本: {self.tts_text}")
- # 直接播放完整文本,不进行分割
- from utils.tts_client import play_text_async
- play_text_async(self.tts_text, use_cache=True)
- logger.info(f"✅ 已调用TTS播放: {self.tts_text}")
- # 播放完成后立即重置状态,防止重复处理
- self.tts_text = ""
- self.is_complete = False
- except Exception as e:
- logger.error(f"❌ 完整文本播放失败: {e}")
- # 降级处理:如果播放失败,记录错误
- logger.error(f"TTS播放失败,无法播放文本: {self.tts_text}")
- # 异常情况下也重置状态
- self.tts_text = ""
- self.is_complete = False
- def _handle_nlp_by_dify(self, question: str) -> None:
- """
- 解析NLP结果并合并文本片段
- Args:
- text_value: NLP文本结果
- status_value: 状态值(0:继续, 1:结束, 2:完成)
- """
- try:
- self.is_complete = True
- # 调用dify模型回答
- print("dify模型问题::::::::::::::::::::::", question)
- answer_text = chat_with_dify(question=question)
- print("dify模型回答::::::::::::::::::::::", answer_text)
- self.tts_text = answer_text
- # 检查是否已处理cbm_semantic,避免重复发送
- if not self.intent_handler.is_cbm_semantic_processed():
- # 异步调用 _send_qa_task,添加异常处理
- try:
- qa_thread = threading.Thread(
- target=_send_qa_task,
- args=(self, {"result": self.tts_text}),
- daemon=True
- )
- qa_thread.start()
- except Exception as e:
- logger.debug(f"[QA] QA请求失败,不影响NLP处理: {e}")
- else:
- logger.info("检测到已处理cbm_semantic,跳过QA发送")
- # 智能分割并播放完整文本
- self._play_complete_text()
- # 重置累积状态
- self.accumulated_text = ""
- self.is_complete = False
- except Exception as e:
- logger.error(f"NLP结果解析异常: {e}")
- def _reset_state(self) -> None:
- """重置状态,准备处理新的对话"""
- self.tts_text = ""
- self.accumulated_text = ""
- self.is_complete = False
- self.detected_intent = None
- # 重置语音处理器的语气助词状态
- try:
- from handlers.speech_handler import SpeechHandler
- # 这里我们需要通过消息处理器来重置语音处理器状态
- # 由于模块间依赖关系,我们通过日志提示需要重置
- logger.debug("需要重置语音处理器语气助词状态")
- except Exception as e:
- logger.debug(f"重置语音处理器状态时出现异常: {e}")
- logger.debug("已重置NLP处理器状态,准备处理新对话")
- def _handle_detected_intent(self, intent: str) -> None:
- """
- 处理检测到的意图
- Args:
- intent: 意图名称
- """
- if intent == "hi":
- logger.info(f"检测到 [{intent}] 意图, 执行打招呼动作")
- elif intent == "hand":
- logger.info(f"检测到 [{intent}] 意图, 执行握手动作")
- elif intent == "tour":
- logger.info(f"检测到 [{intent}] 意图, 执行实验室游览动作")
- elif intent == "Bow":
- logger.info(f"检测到 [{intent}] 意图, 执行鞠躬欢送动作")
- elif intent == "Nod":
- logger.info(f"检测到 [{intent}] 意图, 执行点头动作")
- def get_detected_intent(self) -> str:
- """获取检测到的意图"""
- return self.detected_intent or ""
- def get_tts_text(self) -> str:
- """获取TTS文本"""
- return self.tts_text or ""
|