action_strategies.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. import re
  2. from utils.logger import logger
  3. import threading
  4. import requests
  5. from config.config.settings import config
  6. from .base_strategy import ActionStrategy
  7. from config.config.action_config import get_action_dict, get_point_dict
  8. """
  9. 文件:action_strategies.py
  10. 功能: 定义具体的策略类
  11. """
  12. __Author__ = "xueYang"
  13. # 使用全局配置实例,无需重复加载
  14. API_ACTION_URL = config.get_pc2_url('robot_action')
  15. API_WAYPOINT_URL = config.get_pc2_url('robot_waypoint')
  16. API_CMD_URL = config.get_pc2_url('robot_cmd_action')
  17. API_TTS_URL = config.get_config().get('music_url')
  18. class RealTimeActionStrategy(ActionStrategy):
  19. def execute(self, skill: str):
  20. """执行实时交互动作"""
  21. logger.info(f"执行动作:{skill}")
  22. thread = threading.Thread(
  23. target=self._execute_real_time_action,
  24. args=(skill,),
  25. daemon=True
  26. )
  27. thread.start()
  28. def _execute_real_time_action(self, skill: str):
  29. """实际执行实时交互动作的方法"""
  30. # 这里实现具体的实时交互逻辑
  31. try:
  32. logger.info(f"执行动作:{skill}")
  33. action_dict = get_action_dict()
  34. logger.info(f"动作字典: {action_dict}")
  35. param = action_dict.get(skill)
  36. logger.info(f"获取到的动作参数: {param}")
  37. logger.info(
  38. "========================API接口===========================: %s", API_ACTION_URL)
  39. # if skill
  40. if param is not None:
  41. if re.fullmatch(r'PLAY_MUSIC', skill):
  42. logger.info(f"检测到音乐播放指令: {skill}")
  43. print(f"检测到音乐播放指令: {skill}")
  44. self.send_tts_request("play", "dodge.mp3")
  45. if re.fullmatch(r'STOP_MUSIC', skill):
  46. logger.info(f"检测到音乐停止指令: {skill}")
  47. self.send_tts_request("stop", "dodge.mp3")
  48. try:
  49. response = requests.post(API_ACTION_URL, json={"data": param}, timeout=5)
  50. logger.info(f"API响应状态: {response.status_code}")
  51. except requests.exceptions.RequestException as e:
  52. logger.error(f"请求执行动作失败: {str(e)}")
  53. else:
  54. logger.error(f"未找到动作 {skill} 对应的参数")
  55. except Exception as e:
  56. logger.error(f"执行动作时发生错误: {str(e)}")
  57. def send_tts_request(self, control_type: str, tts_file: str):
  58. payload = {"control_type": control_type, "tts_file": tts_file}
  59. try:
  60. response = requests.post(API_TTS_URL, json=payload, timeout=5)
  61. if response.status_code == 200:
  62. logger.info(f"TTS请求成功: {control_type}, 文件: {tts_file}")
  63. else:
  64. logger.warning(f"TTS请求返回异常状态码: {response.status_code}, 文件: {tts_file}")
  65. except requests.exceptions.RequestException as e:
  66. logger.error(f"TTS请求失败: {str(e)}, 文件: {tts_file}")
  67. class NaviWayPointStrategy(ActionStrategy):
  68. def execute(self, skill: str):
  69. """执行点控动作"""
  70. logger.info(f"执行点控:{skill}")
  71. thread = threading.Thread(
  72. target=self._execute_navigation,
  73. args=(skill,),
  74. daemon=True
  75. )
  76. thread.start()
  77. def _execute_navigation(self, skill: str):
  78. """实际执行点位动作的方法"""
  79. # 这里实现具体的点控逻辑
  80. try:
  81. logger.info(f"执行点控:{skill}")
  82. param = get_point_dict().get(skill)
  83. requests.post(API_WAYPOINT_URL, json={"data": param})
  84. except requests.exceptions.RequestException as e:
  85. logger.error(f"请求API失败: {str(e)}")
  86. class CMDActionStrategy(ActionStrategy):
  87. def execute(self, skill: str):
  88. """执行CMD动作"""
  89. logger.info(f"执行CMD动作:{skill}")
  90. thread = threading.Thread(
  91. target=self._execute_cmd_action,
  92. args=(skill,),
  93. daemon=True
  94. )
  95. thread.start()
  96. def _execute_cmd_action(self, skill: str):
  97. try:
  98. logger.info(f"执行CMD动作:{skill}")
  99. param = skill
  100. #判断 skill 值为ZWJS时,发送参数为17;JSWRJZQ 时,发送参数为18
  101. if skill == "ZWJS":
  102. param = "17"
  103. elif skill == "JSWRJZQ":
  104. param = "18"
  105. requests.post(API_CMD_URL, json={"data": param})
  106. except requests.exceptions.RequestException as e:
  107. logger.error(f"请求API失败: {str(e)}")
  108. # class AiotSceneStrategy(ActionStrategy):
  109. # def execute(self, skill: str):
  110. # """执行AIOT场景动作"""
  111. # logger.info(f"执行AIOT场景动作:{skill}")
  112. # thread = threading.Thread(
  113. # target=aiot_controller.execute_scene,
  114. # args=(skill,),
  115. # daemon=True
  116. # )
  117. # thread.start()
  118. #
  119. # class AiotDeviceStrategy(ActionStrategy):
  120. # def execute(self, skills: str):
  121. # """执行AIOT设备控制动作"""
  122. # state = skills.split("_", 1)[0]
  123. # device_name = skills.split("_", 1)[1]
  124. # logger.info(f"执行设备控制:{device_name} 状态:{state}")
  125. #
  126. # thread = threading.Thread(
  127. # target=aiot_controller.control_device_power,
  128. # args=(device_name, state),
  129. # daemon=True
  130. # )
  131. # thread.start()