| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- '''
- Author: zhaoyong 77912776@qq.com
- Date: 2025-08-17 16:59:19
- LastEditTime: 2025-08-24 14:11:12
- LastEditors: zhaoyong 77912776@qq.com
- FilePath: \robot_ai\strategies\action\execute_actions.py
- Description: 头部注释配置模板
- '''
- from utils.control_aiot import aiot_controller
- from utils.logger import logger
- import threading
- from .action_strategies import (
- RealTimeActionStrategy,
- NaviWayPointStrategy,
- # AiotSceneStrategy,
- # AiotDeviceStrategy,
- CMDActionStrategy
- )
- """
- 文件:action_executor.py
- 功能: 执行动作的类
- """
- __Author__ = "xueYang"
- class ActionExecutor:
- def __init__(self):
- self._strategies = {
- f"OS20015785810.ACTIONS": RealTimeActionStrategy(),
- f"OS20015785810.ACTIONS_TZ": RealTimeActionStrategy(),
- f"OS20015785810.VoiceNavigation": NaviWayPointStrategy(),
- # "scene_": AiotSceneStrategy(),
- # ("open_", "close_"): AiotDeviceStrategy(),
- f"OS20015785810.CMD_ACTIONS": CMDActionStrategy()
- }
- def execute_actions(self, category, skill):
- """
- 使用策略模式执行不同类型的动作 - 异步执行
- :param category: 动作分类
- :param skill: 具体指令
- """
- try:
- if not skill:
- return
- # 实时交互、语音导航
- if category in (f"OS20015785810.ACTIONS",
- f"OS20015785810.VoiceNavigation",
- f"OS20015785810.ACTIONS_TZ",
- f"OS20015785810.CMD_ACTIONS"):
- strategy = self._strategies.get(category)
- if strategy:
- strategy.execute(skill)
- else:
- raise ValueError(f"未知的动作分类: {category}")
- # 智能家居控制动作
- elif category.split('.')[-1] == f"control_aiot":
- logger.info(f"AIOT分类是:{category}执行AIOT动作:{skill}")
- # 根据指令类型选择策略
- if skill.startswith("scene_"):
- thread = threading.Thread(
- target=aiot_controller.execute_scene,
- args=(skill,),
- daemon=True
- )
- thread.start()
- elif skill.startswith(("open_", "close_")):
- state = skill.split("_", 1)[0]
- device_name = skill.split("_", 1)[1]
- logger.info(f"执行设备控制:{device_name} 状态:{state}")
- thread = threading.Thread(
- target=aiot_controller.control_device_power,
- args=(device_name, state),
- daemon=True
- )
- thread.start()
- else:
- raise ValueError(f"未知的AIOT指令: {skill}")
- except Exception as e:
- logger.error(f"执行动作时发生错误: {str(e)}")
- def execute_actions_async(self, category, skill):
- """
- 异步执行动作的方法
- :param category: 动作分类
- :param skill: 具体指令
- """
- thread = threading.Thread(
- target=self.execute_actions,
- args=(category, skill),
- daemon=True
- )
- thread.start()
|