||
- # -*- coding: utf-8 -*-
- import sys
- import os
- """
- 功能: 与物联中控平台交互服务
- """
- __Author__ = "torjean(陶)"
- import base64
- import hashlib
- import hmac
- import os
- import random
- import time
- import requests
- import urllib.parse
- import traceback
- from utils.logger import logger
- from config.config.aiot_config import get_machine_id, get_aiot_host, get_aiot_app_id, get_aiot_app_secret, get_aiot_union_id, get_scenes, get_devices
- home_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- class ControlConf():
- # 物联AIOT - 从配置文件读取
- aiot_unionId = get_aiot_union_id()
- aiot_platform_host = get_aiot_host()
- aiot_appid = get_aiot_app_id()
- aiot_appSecret = get_aiot_app_secret()
- # 机器ID直接从配置文件读取,不再需要本地文件操作
- # 物联交互相关类
- class ControlAIotPlatform:
- def __init__(self, comm_instance=None):
- self.logger = logger
- self.comm_instance = comm_instance
- # 机器id作为userId - 直接从配置文件读取
- self.machine_id = get_machine_id()
- # 用户密钥
- self.user_secretkey = None
- # access-token
- self.access_token = None
- # refresh-token
- self.refresh_token = None
- # 从配置文件加载场景和设备信息
- self._load_config_from_file()
- self.connect()
- def _load_config_from_file(self):
- """从配置文件加载场景和设备信息"""
- try:
- # 加载场景配置
- scenes_config = get_scenes()
- self.scenes_id = scenes_config
- # 加载设备配置并转换为兼容格式
- devices_config = get_devices()
- self.resource_info = {}
- for device_name, device_config in devices_config.items():
- self.resource_info[device_name] = {
- 'devName': device_config.get('dev_name', ''),
- 'productId': device_config.get('product_id', ''),
- 'deviceId': device_config.get('device_id', ''),
- 'resourceId': device_config.get('resource_id', ''),
- 'open': device_config.get('open_value'),
- 'close': device_config.get('close_value'),
- }
- logger.info(
- f"从配置文件加载了 {len(self.scenes_id)} 个场景和 {len(self.resource_info)} 个设备")
- except Exception as e:
- logger.error(f"加载配置文件失败: {e}")
- # 使用默认配置作为后备
- self.scenes_id = {
- "scene_001": '500',
- "scene_002": '501'
- }
- self.resource_info = {}
- # 一、连接物联平台中控系统,获取用户密钥
- def _get_platform_user_secretkey(self):
- current_timestamp = int(time.time())
- try:
- url = ControlConf.aiot_platform_host + '/cloud-api/app/active'
- to_encode_str = (ControlConf.aiot_appid + str(
- current_timestamp) + ControlConf.aiot_unionId + self.machine_id).encode('utf-8')
- # Base64解码appSecretKey
- decode_app_secret = base64.b64decode(ControlConf.aiot_appSecret)
- # hmac_sha256签名
- hmac_encode_sign = hmac.new(
- decode_app_secret, to_encode_str, hashlib.sha256)
- # Base64编码
- base64_encode_sign = base64.b64encode(
- hmac_encode_sign.digest()).decode('utf-8')
- response = requests.post(url, json={
- "appId": ControlConf.aiot_appid,
- "timestamp": current_timestamp,
- "unionId": ControlConf.aiot_unionId,
- "userId": self.machine_id,
- "sign": base64_encode_sign
- }, timeout=3)
- response_json = response.json()
- # logger.info("1.1 获取用户密钥userSecretKey:", response_json)
- if response_json['code'] == 0:
- return True, response_json['data']['userSecret']
- else:
- logger.info("响应失败:", response_json['msg'])
- return False, response_json['msg']
- except Exception as e:
- logger.info(e)
- return False, e
- # 二、连接物联平台中控系统,获取access-token
- def _get_access_token(self):
- url = ControlConf.aiot_platform_host + '/cloud-api/oauth2/token'
- try:
- authorization = self._generate_authorization()
- request_headers = {
- 'Authorization': authorization,
- "Content-Type": "application/x-www-form-urlencoded",
- }
- password = self._generate_password()
- requests_data = {
- "grant_type": 'hmac_sign',
- "username": ControlConf.aiot_appid + '.' + self.machine_id,
- "password": password,
- # 权限
- "scope": 'cloud_write'
- }
- try:
- # logger.info(1111111,request_headers)
- # logger.info(2222222,requests_data)
- encoded_data = urllib.parse.urlencode(requests_data)
- response = requests.post(
- url, headers=request_headers, data=encoded_data, timeout=3)
- response_json = response.json()
- # logger.info("1.2 获取access-token:", response_json)
- if response_json['code'] == 0:
- return True, response_json['data']
- else:
- logger.info("响应失败:", response_json['msg'])
- return False, response_json['msg']
- except requests.exceptions.RequestException as e:
- logger.info("请求失败:", e)
- except Exception as e:
- traceback.print_exc()
- logger.info(e)
- # 生成初始Authorization
- def _generate_authorization(self):
- try:
- ########## 测试案例###########
- # md5_hex_app_secret = hashlib.md5("/uUMzm79BK0RPzI8VBgomSRjngXb5/sH".encode('utf-8')).hexdigest()
- # authorization = 'Basic ' + base64.b64encode(
- # ("NVOPaEwo8S61ytrZ" + ':' + md5_hex_app_secret).encode('utf-8')).decode('utf-8')
- ############################
- # md5 appSecret
- md5_hex_app_secret = hashlib.md5(
- ControlConf.aiot_appSecret.encode('utf-8')).hexdigest()
- # 生成Authorization
- authorization = 'Basic ' + base64.b64encode(
- (ControlConf.aiot_appid + ':' + md5_hex_app_secret).encode('utf-8')).decode('utf-8')
- return authorization
- except Exception as e:
- logger.info(e)
- return None
- # 生成password
- def _generate_password(self):
- try:
- appid = ControlConf.aiot_appid
- userid = self.machine_id
- secure_mode = 'hmac_sign'
- timestamp = int(time.time())
- to_encode_str = (appid + secure_mode +
- str(timestamp) + userid).encode('utf-8')
- # Base64解码userSecretKey
- decoded_user_secret = base64.b64decode(self.user_secretkey)
- # hmac_sha256签名
- hmac_encode_sign = hmac.new(
- decoded_user_secret, to_encode_str, hashlib.sha256)
- # Base64编码
- base64_encode_sign = base64.b64encode(
- hmac_encode_sign.digest()).decode('utf-8')
- # URL 编码
- url_encoded_sign = urllib.parse.quote(base64_encode_sign, safe='')
- password = f'appId={appid}&secureMode={secure_mode}×tamp={timestamp}&userId={userid}&sign={url_encoded_sign}'
- return password
- except Exception as e:
- logger.info(e)
- traceback.print_exc()
- return None
- # 刷新access-token
- def refresh_access_token(self):
- """
- 如果后续接口返回401,则调用该接口刷新access-token
- """
- url = ControlConf.aiot_platform_host + '/cloud-api/oauth2/token'
- try:
- authorization = self._generate_authorization()
- request_headers = {
- 'Authorization': authorization,
- "Content-Type": "application/x-www-form-urlencoded",
- }
- requests_data = {
- "grant_type": 'refresh_token',
- "refresh_token": self.refresh_token
- }
- try:
- # logger.info(1111111,request_headers)
- # logger.info(2222222,requests_data)
- encoded_data = urllib.parse.urlencode(requests_data)
- response = requests.post(
- url, headers=request_headers, data=encoded_data, timeout=3)
- response_json = response.json()
- logger.info("1.3 刷新access-token:", response_json)
- if response_json['code'] == 0:
- self.access_token = response_json['data']['access_token']
- self.refresh_token = response_json['data']['refresh_token']
- return True, ''
- else:
- logger.info("响应失败:", response_json['msg'])
- return False, response_json['msg']
- except requests.exceptions.RequestException as e:
- logger.info("请求失败:", e)
- except Exception as e:
- traceback.print_exc()
- logger.info(e)
- # 连接物联平台中控系统,获取access-token
- def connect(self):
- # 一、获取用户密钥
- connect_flag, user_secret = self._get_platform_user_secretkey()
- if connect_flag is False:
- self.logger.error("1.1、获取用户密钥userSecretKey失败: %s" % user_secret)
- return
- else:
- # logger.info("1.1 成功获取用户密钥userSecretKey:%s" % str(user_secret))
- self.logger.info("1.1、成功获取用户密钥userSecretKey: %s" %
- str(user_secret))
- self.user_secretkey = user_secret
- # self.logger.info("一、获取用户密钥userSecretKey:%s"%self.user_secretkey)
- # 二、获取access-token和refresh-token
- connect_flag, access_token_data = self._get_access_token()
- if connect_flag is False:
- # self.logger.error("二、获取access-token失败:%s"%access_token_data)
- return
- else:
- # logger.info("1.2 成功获取access-token:%s" % str(access_token_data))
- self.logger.info("1.2 成功获取access-token: %s" %
- str(access_token_data))
- self.access_token = access_token_data['access_token']
- self.refresh_token = access_token_data['refresh_token']
- # self.logger.info("二、获取refresh-token:%s"%self.refresh_token)
- # 获取设备列表
- def get_device_list(self):
- url = ControlConf.aiot_platform_host + '/cloud-api/device/list'
- try:
- authorization = "Bearer " + self.access_token
- request_headers = {
- 'Authorization': authorization,
- }
- response = requests.get(url, headers=request_headers, timeout=3)
- response_json = response.json()
- logger.info("2.1 获取设备列表:", response_json)
- if response_json['code'] == 0:
- return True, response_json['data']
- elif response_json['code'] == 401:
- # 如果返回401,则刷新access-token
- self.refresh_access_token()
- return False, "token过期,已刷新,重新调用"
- else:
- logger.info("响应失败:", response_json['msg'])
- return False, response_json['msg']
- except requests.exceptions.RequestException as e:
- logger.info("请求失败:", e)
- # 获取产品资源
- def get_product_resource(self, product_id=''):
- url = ControlConf.aiot_platform_host + '/cloud-api/product/resource'
- try:
- authorization = "Bearer " + self.access_token
- request_headers = {
- 'Authorization': authorization,
- }
- params = {
- "productId": product_id
- }
- response = requests.get(
- url, headers=request_headers, params=params, timeout=3)
- response_json = response.json()
- logger.info("2.2 获取产品资源:", response_json)
- if response_json['code'] == 0:
- return True, response_json['data']
- elif response_json['code'] == 401:
- # 如果返回401,则刷新access-token
- self.refresh_access_token()
- return False, "token过期,已刷新,重新调用"
- else:
- logger.info("响应失败:", response_json['msg'])
- return False, response_json['msg']
- except requests.exceptions.RequestException as e:
- logger.info("请求失败:", e)
- # 控制设备
- def control_device(self, control_detailed_data, device_id='', product_id=''):
- """
- 控制设备
- :param device_id: 设备id
- :param product_id: 产品id
- :param control_detailed_data: 控制数据
- 示例:[{"resourceId":"property.power1","value":1},{"resourceId":"property.power2","value":1}]
- :return: True,''
- """
- url = ControlConf.aiot_platform_host + '/cloud-api/device/control'
- authorization = "Bearer " + self.access_token
- headers = {
- 'Authorization': authorization,
- 'Content-Type': 'application/json',
- }
- requests_data = {
- "productId": product_id,
- "deviceId": device_id,
- "data": control_detailed_data
- }
- def _send_request():
- response = requests.post(
- url, headers=headers, json=requests_data, timeout=3)
- return response.json()
- try:
- response_json = _send_request()
- logger.info("2.3 控制设备:", response_json)
- if response_json['code'] == 401:
- # 如果返回401,则刷新access-token
- self.refresh_access_token()
- logger.info("token过期,已刷新,重试中")
- response_json = _send_request()
- if response_json['code'] == 0:
- return True, response_json
- else:
- logger.info("响应失败:", response_json['msg'])
- return False, response_json
- except requests.exceptions.RequestException as e:
- logger.info("请求失败:", e)
- # 场景列表
- def get_scene_list(self):
- """
- :return: True,[{'acId': '338', 'name': '离家模式'}, {'acId': '389', 'name': '回家模式'}]
- """
- url = ControlConf.aiot_platform_host + '/cloud-api/scene/list'
- try:
- authorization = "Bearer " + self.access_token
- requests_headers = {
- 'Authorization': authorization,
- }
- response = requests.get(url, headers=requests_headers, timeout=3)
- response_json = response.json()
- logger.info("2.4 场景列表为:", response_json)
- if response_json['code'] == 0:
- return True, response_json['data']
- elif response_json['code'] == 401:
- # 如果返回401,则刷新access-token
- self.refresh_access_token()
- return False, "token过期,已刷新,重新调用"
- else:
- logger.info("响应失败:", response_json['msg'])
- return False, response_json['msg']
- except Exception as e:
- logger.info("请求失败:", e)
- return False, e
- # 对外接口,场景执行
- def execute_scene(self, acid_name=''):
- """
- :param acid_name: 场景名字
- :return: True,''
- """
- if self.scenes_id.get(acid_name, '') == '':
- return False, "场景不存在"
- acid = self.scenes_id.get(acid_name, '')
- url = ControlConf.aiot_platform_host + '/cloud-api/scene/control'
- authorization = "Bearer " + self.access_token
- headers = {
- 'Authorization': authorization,
- "Content-Type": "application/json",
- }
- data = {"acId": acid}
- def send_request():
- _response_json = requests.post(
- url, headers=headers, json=data, timeout=3)
- return _response_json.json()
- try:
- response_json = send_request()
- # logger.info("2.5 执行场景:", response_json)
- if response_json['code'] == 401:
- # 如果返回401,则刷新access-token
- self.refresh_access_token()
- # return False, "token过期,已刷新,重新调用"
- logger.info("token过期,已刷新,重试中...")
- response = send_request()
- response_json = response.json()
- if response_json['code'] == 0:
- return True, response_json['data']
- else:
- logger.info("响应失败:", response_json['msg'])
- return False, response_json['msg']
- except Exception as e:
- logger.info("请求失败:", e)
- return False, e
- # 对外接口,控制设备开关
- def control_device_power(self, device_name='', state=''):
- """
- 对外接口,控制设备开关
- :param device_name: 设备名称
- :param state: 开关状态
- :return: True,''
- """
- try:
- dev_dict = self.resource_info.get(device_name, '')
- logger.info("-----------%s-%s-----------" % (state, device_name))
- if dev_dict == '' or state not in ['open', 'close']:
- return False, "设备名称或状态不存在"
- else:
- # 控制数据
- control_detailed_data = [
- {"resourceId": dev_dict['resourceId'],
- "value": dev_dict[state]}
- ]
- res, res_data = self.control_device(device_id=dev_dict['deviceId'], product_id=dev_dict['productId'],
- control_detailed_data=control_detailed_data)
- return res, res_data
- except Exception as e:
- logger.info(e)
- return False, e
- def main(self, stop_event):
- try:
- logger.info("!!!!!!!!!!!可以控制家具!!!!!!!!!!!")
- except Exception as e:
- self.logger.error(f"与物联平台中控系统交互出错: {e}")
- aiot_controller = ControlAIotPlatform()
- if __name__ == "__main__":
- print(home_path)
- # 获取设备列表
- # aiot_controller.get_device_list()
- # 获取具体
- # res, resdata = aiot_controller.get_product_resource("RSD00005")
- # if res is True:
- # logger.info("获取资源RSD00003成功!",resdata)
- # else:
- # logger.info("获取资源RSD00003失败!",resdata)
- # aiot_controller.get_scene_list()
- # aiot_controller.execute_scene('scene_001')
- # 控制设备先写死测试
- # {
- # 'productId': 'KXYP79V2',
- # 'deviceId': '0001200d90395efffe80c4ee',
- # 'deviceName': '卧室灯带',
- # 'place': '卧室',
- # 'resourceId': 'power2',
- # 'connected': True
- # }
- # control_data = [
- # # {"resourceId": "power2", "value": False},
- # # {"resourceId": "power1", "value": False},
- # # 窗帘
- # {"resourceId": "work1.work1", "value": "2"}
- # ]
- # res, resdata = aiot_controller.control_device(device_id='ATARWSA40001B8D61AA720B0', product_id='RSD00005',
- # control_detailed_data=control_data)
- # if res is True:
- # print("客厅筒灯!",resdata)
- # else:
- # print("客厅筒灯!",resdata)
|