import threading from config.config.dify_config import difyconfig from handlers.dify.recognize_intention import chat_with_dify import hashlib import pymysql from utils.pc2_requests import _send_qa_task, _send_led_color_task from utils.tts_client import play_text_async import json5 import random import json from utils.logger import logger import requests import paramiko from sshtunnel import SSHTunnelForwarder from config.config.settings import config from strategies.vision.qwenv import process_base64_image from strategies.action.execute_actions import ActionExecutor from utils.logger import logger """ 意图处理模块 """ class IntentHandler: """意图处理类""" def __init__(self): self.detected_intent = None self.tts_text = "" self.cbm_semantic_processed = False # 标记是否已处理视觉内容 self.executor = ActionExecutor() self.vision_dealing_words = [ "我将从多个角度分析", # 日常聊天标配,简单自然 "我将从多维视角分析", # 像说"我瞅瞅相关内容",带着行动感 "我将从不同层面分析", # 随口的礼貌,特生活化 "我将从多方面评估", # 直白展现正在动脑筋 "我将从多个维度剖析", # 比"稍等下"正式点,又不刻意 ] def play_vision_phrase(self, type="thinking"): """播放语气助词""" phrase = random.choice(self.vision_dealing_words) logger.info(f"播放视觉语气助词: {phrase}") play_text_async(phrase, use_cache=True) def handle_intent_result(self, data: dict) -> None: """ 处理意图识别结果 Args: data: 意图数据 """ try: try: # 异步调用 _send_led_color_task led_thread = threading.Thread( target=_send_led_color_task, args=(self, "BREATH", "BLUE"), daemon=True ) led_thread.start() except Exception as e: logger.debug(f"[LED] LED控制失败,不影响意图处理: {e}") # logger.info('意图识别data', json.dumps(data, ensure_ascii=False, indent=2)) text_value = data.get('content', {}).get( 'result', {}).get('cbm_semantic', {}).get('text') if text_value: intent = json.loads(text_value) rc = intent.get('rc', -1) if rc == 0: category = intent.get('category', "") logger.info(f"技能结果: {category}") # 排除默认技能 if category == "IFLYTEK.mapU": logger.info(f"排除默认技能: {category}") return # 视觉意图 elif category.split('.')[-1] == "vision": ################### # 使用dify识别视觉意图 logger.info(f"使用dify识别视觉意图") logger.info( f"difyconfig.get_vision_switch(): {difyconfig.get_vision_switch()}") if difyconfig.get_vision_switch(): # 判断是否是视觉意图 iat_txt = intent.get('text', "") res = chat_with_dify( question=iat_txt, image_path='') if res != '视觉意图': return elif category.split('.')[-1] == "wake_up": semantics_list = intent.get("semantic", []) if semantics_list: semantics = semantics_list[0] skill_purpose = semantics.get('intent') if skill_purpose == "WAKEUP": self.cbm_semantic_processed = False elif skill_purpose == "RESET_WAKEUP": self.cbm_semantic_processed = True ################### iat_txt = intent.get('text', "") logger.info(f"意图识别文本: {iat_txt}") answer = intent.get('answer', {}).get('text', "") logger.info(f"意图识别答案: {answer}") if answer: try: # 异步调用 _send_qa_task logger.info("发送QA接口===========> [QA]") if category.split('.')[-1] != "vision": qa_thread = threading.Thread( target=_send_qa_task, args=(self, {"result": answer}), daemon=True ) qa_thread.start() except Exception as e: logger.error(f"QA接口请求失败: {e}") if not self.cbm_semantic_processed: self.cbm_semantic_processed = True try: play_text_async(answer, use_cache=True) logger.info(f"[意图] 答案播放请求已发送") except Exception as e: logger.info(f"[意图] 播放请求失败: {e}") import traceback traceback.print_exc() semantics_list = intent.get("semantic", []) if semantics_list: semantics = semantics_list[0] skill_purpose = semantics.get('intent') # 处理具体意图 self._handle_detected_intent( category, iat_txt, skill_purpose) else: logger.warning("未获取到语义信息") # 处理具体意图,但不传递skill_purpose self._handle_detected_intent( category, iat_txt, None) else: logger.warning("未获取到意图文本") except Exception as e: logger.error(f"意图处理异常: {e}") def _handle_detected_intent(self, intent: str, iat_txt: str, purpose) -> None: """ 处理检测到的意图 Args: intent: 意图名称 iat_txt: 原IAT内容 """ # 读取yaml配置文件 config_data = config.get_config() xunfei_config = config_data.get('xunfei', {}) categories = xunfei_config.get('category', []) if intent in categories: self.executor.execute_actions(intent, purpose) # 视觉调用 elif intent.split('.')[-1] == "vision": # 在识别开始时播放语气助词 logger.info(f"开始播放语气词") self.play_vision_phrase(type="vision") logger.info(f"检测到 [{intent}] 意图, 执行视觉相关") # 标记已处理视觉内容,避免后续NLP重复播放 self.cbm_semantic_processed = True answer_text = '摄像头服务异常' try: camera_url = config._config_data.get( 'camera_url', 'http://127.0.0.1:34550/camera_base64' ) headers = {"Content-Type": "application/json"} data = {} # matches the -d '{}' from curl response = requests.get(camera_url, headers=headers, json=data) resp_data = json5.loads(response.text) # 处理响应 if resp_data.get('code') == 200: base64_pic = resp_data.get('data').get('base64_content') answer_text = process_base64_image( base64_pic, question=iat_txt) logger.info(f"视觉调用结果: {answer_text}") play_text_async(answer_text) else: logger.error(f"摄像头服务异常: {str(resp_data.get('code'))}") try: # 异步调用 _send_qa_task logger.info("发送QA接口===========> [QA]") qa_thread = threading.Thread( target=_send_qa_task, args=(self, {"result": answer_text}), daemon=True ) qa_thread.start() except Exception as e: logger.error(f"QA接口请求失败: {e}") except Exception as e: logger.error(f"摄像头服务异常: {str(e)}") elif intent.split('.')[-1] == "BASE_STATION": try: match purpose: case "Base_Stations": sum1, sum2 = self.sql_query() play_text_async(f"实时客流为{sum1}人次,累计客流为{sum2}人次") except requests.exceptions.RequestException as e: logger.error(f"区域基站客流查询API调用失败: {e}") elif intent.split('.')[-1] == "Game_Guess": try: match purpose: case "Ready": pass case "Start_Game": try: response = requests.post(url="http://192.168.123.164:9002/game", json={"target": "game", "cmd": "start_game", "id": "cq"}) result = response.json() if result and result["code"] == 0: play_text_async("游戏开始") else: play_text_async("游戏开始失败, 请稍后重试") except requests.exceptions.RequestException as e: logger.error(f"猜拳游戏异常(开始): {e}") case "Stop_Game": try: response = requests.post(url="http://192.168.123.164:9002/game", json={"target": "game", "cmd": "stop_game", "id": "cq"}) result = response.json() if result and result["code"] == 0: play_text_async("游戏结束") else: play_text_async("游戏结束失败, 请稍后重试") except requests.exceptions.RequestException as e: logger.error(f"猜拳游戏异常(结束): {e}") except requests.exceptions.RequestException as e: logger.error(f"猜拳游戏执行异常: {e}") elif intent.split('.')[-1] == "switch_model": if difyconfig.get_models_switch(): # 切换模型 model_dify_type = difyconfig.get_model_dify_type(purpose) if model_dify_type: difyconfig.set_current_model(model_dify_type) else: logger.info(f"检测到未知意图: {intent}") def get_detected_intent(self) -> str: """获取检测到的意图""" return self.detected_intent or "" def get_tts_text(self) -> str: """获取TTS文本""" return self.tts_text or "" def is_cbm_semantic_processed(self) -> bool: """检查是否已处理cbm_semantic""" return self.cbm_semantic_processed def reset_cbm_semantic_processed(self) -> None: """重置cbm_semantic处理标记""" self.cbm_semantic_processed = False def generate_md5(self, userId: str, key: str, full_identifier: str) -> str: """MD5加密""" combined = f"{userId}{key}{full_identifier}" md5_hash = hashlib.md5(combined.encode('utf-8')).hexdigest() return md5_hash.upper() def sql_query(self): try: with SSHTunnelForwarder( (config._config_data.get('ssh_host'), config._config_data.get('ssh_port')), ssh_username=config._config_data.get('ssh_user'), ssh_password=config._config_data.get('ssh_pass'), remote_bind_address=( config._config_data.get('db_host'), config._config_data.get('db_port') ) ) as tunnel: logger.info( f"SSH 隧道已建立,开始更新数据库: {config._config_data.get('table')}") conn = pymysql.connect( host=config._config_data.get('db_host'), port=tunnel.local_bind_port, user=config._config_data.get('db_user'), password=config._config_data.get('db_pass'), db=config._config_data.get('db_name'), autocommit=True ) try: with conn.cursor() as cur: cur.execute("SELECT 1;") result = cur.fetchone() if result and result[0] == 1: logger.info("数据库连接成功!") else: logger.error("数据库连接失败!") select_sql = f""" SELECT flow_sum1, flow_sum2 FROM {config._config_data.get('table')} WHERE identity = 'Current' """ with conn.cursor() as cur: cur.execute(select_sql) row = cur.fetchone() if row: # flow_sum1, flow_sum2 = row return row else: logger.warning("未查询到数据!") finally: conn.close() logger.info("数据库连接已关闭") except Exception as e: logger.error(f"更新数据库失败: {e}")