''' Author: zhaoyong 77912776@qq.com Date: 2025-08-17 16:59:19 LastEditTime: 2025-08-24 14:11:12 LastEditors: zhaoyong 77912776@qq.com FilePath: \robot_ai\strategies\action\execute_actions.py Description: 头部注释配置模板 ''' from utils.control_aiot import aiot_controller from utils.logger import logger import threading from .action_strategies import ( RealTimeActionStrategy, NaviWayPointStrategy, # AiotSceneStrategy, # AiotDeviceStrategy, CMDActionStrategy ) """ 文件:action_executor.py 功能: 执行动作的类 """ __Author__ = "xueYang" class ActionExecutor: def __init__(self): self._strategies = { f"OS20015785810.ACTIONS": RealTimeActionStrategy(), f"OS20015785810.ACTIONS_TZ": RealTimeActionStrategy(), f"OS20015785810.VoiceNavigation": NaviWayPointStrategy(), # "scene_": AiotSceneStrategy(), # ("open_", "close_"): AiotDeviceStrategy(), f"OS20015785810.CMD_ACTIONS": CMDActionStrategy() } def execute_actions(self, category, skill): """ 使用策略模式执行不同类型的动作 - 异步执行 :param category: 动作分类 :param skill: 具体指令 """ try: if not skill: return # 实时交互、语音导航 if category in (f"OS20015785810.ACTIONS", f"OS20015785810.VoiceNavigation", f"OS20015785810.ACTIONS_TZ", f"OS20015785810.CMD_ACTIONS"): strategy = self._strategies.get(category) if strategy: strategy.execute(skill) else: raise ValueError(f"未知的动作分类: {category}") # 智能家居控制动作 elif category.split('.')[-1] == f"control_aiot": logger.info(f"AIOT分类是:{category}执行AIOT动作:{skill}") # 根据指令类型选择策略 if skill.startswith("scene_"): thread = threading.Thread( target=aiot_controller.execute_scene, args=(skill,), daemon=True ) thread.start() elif skill.startswith(("open_", "close_")): state = skill.split("_", 1)[0] device_name = skill.split("_", 1)[1] logger.info(f"执行设备控制:{device_name} 状态:{state}") thread = threading.Thread( target=aiot_controller.control_device_power, args=(device_name, state), daemon=True ) thread.start() else: raise ValueError(f"未知的AIOT指令: {skill}") except Exception as e: logger.error(f"执行动作时发生错误: {str(e)}") def execute_actions_async(self, category, skill): """ 异步执行动作的方法 :param category: 动作分类 :param skill: 具体指令 """ thread = threading.Thread( target=self.execute_actions, args=(category, skill), daemon=True ) thread.start()