intent_handler.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. import threading
  2. from config.config.dify_config import difyconfig
  3. from handlers.dify.recognize_intention import chat_with_dify
  4. import hashlib
  5. import pymysql
  6. from utils.pc2_requests import _send_qa_task, _send_led_color_task
  7. from utils.tts_client import play_text_async
  8. import json5
  9. import random
  10. import json
  11. from utils.logger import logger
  12. import requests
  13. import paramiko
  14. from sshtunnel import SSHTunnelForwarder
  15. from config.config.settings import config
  16. from strategies.vision.qwenv import process_base64_image
  17. from strategies.action.execute_actions import ActionExecutor
  18. from utils.logger import logger
  19. """
  20. 意图处理模块
  21. """
  22. class IntentHandler:
  23. """意图处理类"""
  24. def __init__(self):
  25. self.detected_intent = None
  26. self.tts_text = ""
  27. self.cbm_semantic_processed = False # 标记是否已处理视觉内容
  28. self.executor = ActionExecutor()
  29. self.vision_dealing_words = [
  30. "我将从多个角度分析", # 日常聊天标配,简单自然
  31. "我将从多维视角分析", # 像说"我瞅瞅相关内容",带着行动感
  32. "我将从不同层面分析", # 随口的礼貌,特生活化
  33. "我将从多方面评估", # 直白展现正在动脑筋
  34. "我将从多个维度剖析", # 比"稍等下"正式点,又不刻意
  35. ]
  36. def play_vision_phrase(self, type="thinking"):
  37. """播放语气助词"""
  38. phrase = random.choice(self.vision_dealing_words)
  39. logger.info(f"播放视觉语气助词: {phrase}")
  40. play_text_async(phrase, use_cache=True)
  41. def handle_intent_result(self, data: dict) -> None:
  42. """
  43. 处理意图识别结果
  44. Args:
  45. data: 意图数据
  46. """
  47. try:
  48. try:
  49. # 异步调用 _send_led_color_task
  50. led_thread = threading.Thread(
  51. target=_send_led_color_task,
  52. args=(self, "BREATH", "BLUE"),
  53. daemon=True
  54. )
  55. led_thread.start()
  56. except Exception as e:
  57. logger.debug(f"[LED] LED控制失败,不影响意图处理: {e}")
  58. # logger.info('意图识别data', json.dumps(data, ensure_ascii=False, indent=2))
  59. text_value = data.get('content', {}).get(
  60. 'result', {}).get('cbm_semantic', {}).get('text')
  61. if text_value:
  62. intent = json.loads(text_value)
  63. rc = intent.get('rc', -1)
  64. if rc == 0:
  65. category = intent.get('category', "")
  66. logger.info(f"技能结果: {category}")
  67. # 排除默认技能
  68. if category == "IFLYTEK.mapU":
  69. logger.info(f"排除默认技能: {category}")
  70. return
  71. # 视觉意图
  72. elif category.split('.')[-1] == "vision":
  73. ###################
  74. # 使用dify识别视觉意图
  75. logger.info(f"使用dify识别视觉意图")
  76. logger.info(
  77. f"difyconfig.get_vision_switch(): {difyconfig.get_vision_switch()}")
  78. if difyconfig.get_vision_switch():
  79. # 判断是否是视觉意图
  80. iat_txt = intent.get('text', "")
  81. res = chat_with_dify(
  82. question=iat_txt, image_path='')
  83. if res != '视觉意图':
  84. return
  85. elif category.split('.')[-1] == "wake_up":
  86. semantics_list = intent.get("semantic", [])
  87. if semantics_list:
  88. semantics = semantics_list[0]
  89. skill_purpose = semantics.get('intent')
  90. if skill_purpose == "WAKEUP":
  91. self.cbm_semantic_processed = False
  92. elif skill_purpose == "RESET_WAKEUP":
  93. self.cbm_semantic_processed = True
  94. ###################
  95. iat_txt = intent.get('text', "")
  96. logger.info(f"意图识别文本: {iat_txt}")
  97. answer = intent.get('answer', {}).get('text', "")
  98. logger.info(f"意图识别答案: {answer}")
  99. if answer:
  100. try:
  101. # 异步调用 _send_qa_task
  102. logger.info("发送QA接口===========> [QA]")
  103. if category.split('.')[-1] != "vision":
  104. qa_thread = threading.Thread(
  105. target=_send_qa_task,
  106. args=(self, {"result": answer}),
  107. daemon=True
  108. )
  109. qa_thread.start()
  110. except Exception as e:
  111. logger.error(f"QA接口请求失败: {e}")
  112. if not self.cbm_semantic_processed:
  113. self.cbm_semantic_processed = True
  114. try:
  115. play_text_async(answer, use_cache=True)
  116. logger.info(f"[意图] 答案播放请求已发送")
  117. except Exception as e:
  118. logger.info(f"[意图] 播放请求失败: {e}")
  119. import traceback
  120. traceback.print_exc()
  121. semantics_list = intent.get("semantic", [])
  122. if semantics_list:
  123. semantics = semantics_list[0]
  124. skill_purpose = semantics.get('intent')
  125. # 处理具体意图
  126. self._handle_detected_intent(
  127. category, iat_txt, skill_purpose)
  128. else:
  129. logger.warning("未获取到语义信息")
  130. # 处理具体意图,但不传递skill_purpose
  131. self._handle_detected_intent(
  132. category, iat_txt, None)
  133. else:
  134. logger.warning("未获取到意图文本")
  135. except Exception as e:
  136. logger.error(f"意图处理异常: {e}")
  137. def _handle_detected_intent(self, intent: str, iat_txt: str, purpose) -> None:
  138. """
  139. 处理检测到的意图
  140. Args:
  141. intent: 意图名称
  142. iat_txt: 原IAT内容
  143. """
  144. # 读取yaml配置文件
  145. config_data = config.get_config()
  146. xunfei_config = config_data.get('xunfei', {})
  147. categories = xunfei_config.get('category', [])
  148. if intent in categories:
  149. self.executor.execute_actions(intent, purpose)
  150. # 视觉调用
  151. elif intent.split('.')[-1] == "vision":
  152. # 在识别开始时播放语气助词
  153. logger.info(f"开始播放语气词")
  154. self.play_vision_phrase(type="vision")
  155. logger.info(f"检测到 [{intent}] 意图, 执行视觉相关")
  156. # 标记已处理视觉内容,避免后续NLP重复播放
  157. self.cbm_semantic_processed = True
  158. answer_text = '摄像头服务异常'
  159. try:
  160. camera_url = config._config_data.get(
  161. 'camera_url', 'http://127.0.0.1:34550/camera_base64'
  162. )
  163. headers = {"Content-Type": "application/json"}
  164. data = {} # matches the -d '{}' from curl
  165. response = requests.get(camera_url, headers=headers, json=data)
  166. resp_data = json5.loads(response.text)
  167. # 处理响应
  168. if resp_data.get('code') == 200:
  169. base64_pic = resp_data.get('data').get('base64_content')
  170. answer_text = process_base64_image(
  171. base64_pic, question=iat_txt)
  172. logger.info(f"视觉调用结果: {answer_text}")
  173. play_text_async(answer_text)
  174. else:
  175. logger.error(f"摄像头服务异常: {str(resp_data.get('code'))}")
  176. try:
  177. # 异步调用 _send_qa_task
  178. logger.info("发送QA接口===========> [QA]")
  179. qa_thread = threading.Thread(
  180. target=_send_qa_task,
  181. args=(self, {"result": answer_text}),
  182. daemon=True
  183. )
  184. qa_thread.start()
  185. except Exception as e:
  186. logger.error(f"QA接口请求失败: {e}")
  187. except Exception as e:
  188. logger.error(f"摄像头服务异常: {str(e)}")
  189. elif intent.split('.')[-1] == "BASE_STATION":
  190. try:
  191. match purpose:
  192. case "Base_Stations":
  193. sum1, sum2 = self.sql_query()
  194. play_text_async(f"实时客流为{sum1}人次,累计客流为{sum2}人次")
  195. except requests.exceptions.RequestException as e:
  196. logger.error(f"区域基站客流查询API调用失败: {e}")
  197. elif intent.split('.')[-1] == "Game_Guess":
  198. try:
  199. match purpose:
  200. case "Ready":
  201. pass
  202. case "Start_Game":
  203. try:
  204. response = requests.post(url="http://192.168.123.164:9002/game",
  205. json={"target": "game", "cmd": "start_game", "id": "cq"})
  206. result = response.json()
  207. if result and result["code"] == 0:
  208. play_text_async("游戏开始")
  209. else:
  210. play_text_async("游戏开始失败, 请稍后重试")
  211. except requests.exceptions.RequestException as e:
  212. logger.error(f"猜拳游戏异常(开始): {e}")
  213. case "Stop_Game":
  214. try:
  215. response = requests.post(url="http://192.168.123.164:9002/game",
  216. json={"target": "game", "cmd": "stop_game", "id": "cq"})
  217. result = response.json()
  218. if result and result["code"] == 0:
  219. play_text_async("游戏结束")
  220. else:
  221. play_text_async("游戏结束失败, 请稍后重试")
  222. except requests.exceptions.RequestException as e:
  223. logger.error(f"猜拳游戏异常(结束): {e}")
  224. except requests.exceptions.RequestException as e:
  225. logger.error(f"猜拳游戏执行异常: {e}")
  226. elif intent.split('.')[-1] == "switch_model":
  227. if difyconfig.get_models_switch():
  228. # 切换模型
  229. model_dify_type = difyconfig.get_model_dify_type(purpose)
  230. if model_dify_type:
  231. difyconfig.set_current_model(model_dify_type)
  232. else:
  233. logger.info(f"检测到未知意图: {intent}")
  234. def get_detected_intent(self) -> str:
  235. """获取检测到的意图"""
  236. return self.detected_intent or ""
  237. def get_tts_text(self) -> str:
  238. """获取TTS文本"""
  239. return self.tts_text or ""
  240. def is_cbm_semantic_processed(self) -> bool:
  241. """检查是否已处理cbm_semantic"""
  242. return self.cbm_semantic_processed
  243. def reset_cbm_semantic_processed(self) -> None:
  244. """重置cbm_semantic处理标记"""
  245. self.cbm_semantic_processed = False
  246. def generate_md5(self, userId: str, key: str, full_identifier: str) -> str:
  247. """MD5加密"""
  248. combined = f"{userId}{key}{full_identifier}"
  249. md5_hash = hashlib.md5(combined.encode('utf-8')).hexdigest()
  250. return md5_hash.upper()
  251. def sql_query(self):
  252. try:
  253. with SSHTunnelForwarder(
  254. (config._config_data.get('ssh_host'),
  255. config._config_data.get('ssh_port')),
  256. ssh_username=config._config_data.get('ssh_user'),
  257. ssh_password=config._config_data.get('ssh_pass'),
  258. remote_bind_address=(
  259. config._config_data.get('db_host'),
  260. config._config_data.get('db_port')
  261. )
  262. ) as tunnel:
  263. logger.info(
  264. f"SSH 隧道已建立,开始更新数据库: {config._config_data.get('table')}")
  265. conn = pymysql.connect(
  266. host=config._config_data.get('db_host'),
  267. port=tunnel.local_bind_port,
  268. user=config._config_data.get('db_user'),
  269. password=config._config_data.get('db_pass'),
  270. db=config._config_data.get('db_name'),
  271. autocommit=True
  272. )
  273. try:
  274. with conn.cursor() as cur:
  275. cur.execute("SELECT 1;")
  276. result = cur.fetchone()
  277. if result and result[0] == 1:
  278. logger.info("数据库连接成功!")
  279. else:
  280. logger.error("数据库连接失败!")
  281. select_sql = f"""
  282. SELECT flow_sum1, flow_sum2
  283. FROM {config._config_data.get('table')}
  284. WHERE identity = 'Current'
  285. """
  286. with conn.cursor() as cur:
  287. cur.execute(select_sql)
  288. row = cur.fetchone()
  289. if row:
  290. # flow_sum1, flow_sum2 = row
  291. return row
  292. else:
  293. logger.warning("未查询到数据!")
  294. finally:
  295. conn.close()
  296. logger.info("数据库连接已关闭")
  297. except Exception as e:
  298. logger.error(f"更新数据库失败: {e}")