execute_actions.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. '''
  2. Author: zhaoyong 77912776@qq.com
  3. Date: 2025-08-17 16:59:19
  4. LastEditTime: 2025-08-24 14:11:12
  5. LastEditors: zhaoyong 77912776@qq.com
  6. FilePath: \robot_ai\strategies\action\execute_actions.py
  7. Description: 头部注释配置模板
  8. '''
  9. from utils.control_aiot import aiot_controller
  10. from utils.logger import logger
  11. import threading
  12. from .action_strategies import (
  13. RealTimeActionStrategy,
  14. NaviWayPointStrategy,
  15. # AiotSceneStrategy,
  16. # AiotDeviceStrategy,
  17. CMDActionStrategy
  18. )
  19. """
  20. 文件:action_executor.py
  21. 功能: 执行动作的类
  22. """
  23. __Author__ = "xueYang"
  24. class ActionExecutor:
  25. def __init__(self):
  26. self._strategies = {
  27. f"OS20015785810.ACTIONS": RealTimeActionStrategy(),
  28. f"OS20015785810.ACTIONS_TZ": RealTimeActionStrategy(),
  29. f"OS20015785810.VoiceNavigation": NaviWayPointStrategy(),
  30. # "scene_": AiotSceneStrategy(),
  31. # ("open_", "close_"): AiotDeviceStrategy(),
  32. f"OS20015785810.CMD_ACTIONS": CMDActionStrategy()
  33. }
  34. def execute_actions(self, category, skill):
  35. """
  36. 使用策略模式执行不同类型的动作 - 异步执行
  37. :param category: 动作分类
  38. :param skill: 具体指令
  39. """
  40. try:
  41. if not skill:
  42. return
  43. # 实时交互、语音导航
  44. if category in (f"OS20015785810.ACTIONS",
  45. f"OS20015785810.VoiceNavigation",
  46. f"OS20015785810.ACTIONS_TZ",
  47. f"OS20015785810.CMD_ACTIONS"):
  48. strategy = self._strategies.get(category)
  49. if strategy:
  50. strategy.execute(skill)
  51. else:
  52. raise ValueError(f"未知的动作分类: {category}")
  53. # 智能家居控制动作
  54. elif category.split('.')[-1] == f"control_aiot":
  55. logger.info(f"AIOT分类是:{category}执行AIOT动作:{skill}")
  56. # 根据指令类型选择策略
  57. if skill.startswith("scene_"):
  58. thread = threading.Thread(
  59. target=aiot_controller.execute_scene,
  60. args=(skill,),
  61. daemon=True
  62. )
  63. thread.start()
  64. elif skill.startswith(("open_", "close_")):
  65. state = skill.split("_", 1)[0]
  66. device_name = skill.split("_", 1)[1]
  67. logger.info(f"执行设备控制:{device_name} 状态:{state}")
  68. thread = threading.Thread(
  69. target=aiot_controller.control_device_power,
  70. args=(device_name, state),
  71. daemon=True
  72. )
  73. thread.start()
  74. else:
  75. raise ValueError(f"未知的AIOT指令: {skill}")
  76. except Exception as e:
  77. logger.error(f"执行动作时发生错误: {str(e)}")
  78. def execute_actions_async(self, category, skill):
  79. """
  80. 异步执行动作的方法
  81. :param category: 动作分类
  82. :param skill: 具体指令
  83. """
  84. thread = threading.Thread(
  85. target=self.execute_actions,
  86. args=(category, skill),
  87. daemon=True
  88. )
  89. thread.start()