control_aiot.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. # -*- coding: utf-8 -*-
  2. import sys
  3. import os
  4. """
  5. 功能: 与物联中控平台交互服务
  6. """
  7. __Author__ = "torjean(陶)"
  8. import base64
  9. import hashlib
  10. import hmac
  11. import os
  12. import random
  13. import time
  14. import requests
  15. import urllib.parse
  16. import traceback
  17. from utils.logger import logger
  18. from config.config.aiot_config import get_machine_id, get_aiot_host, get_aiot_app_id, get_aiot_app_secret, get_aiot_union_id, get_scenes, get_devices
  19. home_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
  20. class ControlConf():
  21. # 物联AIOT - 从配置文件读取
  22. aiot_unionId = get_aiot_union_id()
  23. aiot_platform_host = get_aiot_host()
  24. aiot_appid = get_aiot_app_id()
  25. aiot_appSecret = get_aiot_app_secret()
  26. # 机器ID直接从配置文件读取,不再需要本地文件操作
  27. # 物联交互相关类
  28. class ControlAIotPlatform:
  29. def __init__(self, comm_instance=None):
  30. self.logger = logger
  31. self.comm_instance = comm_instance
  32. # 机器id作为userId - 直接从配置文件读取
  33. self.machine_id = get_machine_id()
  34. # 用户密钥
  35. self.user_secretkey = None
  36. # access-token
  37. self.access_token = None
  38. # refresh-token
  39. self.refresh_token = None
  40. # 从配置文件加载场景和设备信息
  41. self._load_config_from_file()
  42. self.connect()
  43. def _load_config_from_file(self):
  44. """从配置文件加载场景和设备信息"""
  45. try:
  46. # 加载场景配置
  47. scenes_config = get_scenes()
  48. self.scenes_id = scenes_config
  49. # 加载设备配置并转换为兼容格式
  50. devices_config = get_devices()
  51. self.resource_info = {}
  52. for device_name, device_config in devices_config.items():
  53. self.resource_info[device_name] = {
  54. 'devName': device_config.get('dev_name', ''),
  55. 'productId': device_config.get('product_id', ''),
  56. 'deviceId': device_config.get('device_id', ''),
  57. 'resourceId': device_config.get('resource_id', ''),
  58. 'open': device_config.get('open_value'),
  59. 'close': device_config.get('close_value'),
  60. }
  61. logger.info(
  62. f"从配置文件加载了 {len(self.scenes_id)} 个场景和 {len(self.resource_info)} 个设备")
  63. except Exception as e:
  64. logger.error(f"加载配置文件失败: {e}")
  65. # 使用默认配置作为后备
  66. self.scenes_id = {
  67. "scene_001": '500',
  68. "scene_002": '501'
  69. }
  70. self.resource_info = {}
  71. # 一、连接物联平台中控系统,获取用户密钥
  72. def _get_platform_user_secretkey(self):
  73. current_timestamp = int(time.time())
  74. try:
  75. url = ControlConf.aiot_platform_host + '/cloud-api/app/active'
  76. to_encode_str = (ControlConf.aiot_appid + str(
  77. current_timestamp) + ControlConf.aiot_unionId + self.machine_id).encode('utf-8')
  78. # Base64解码appSecretKey
  79. decode_app_secret = base64.b64decode(ControlConf.aiot_appSecret)
  80. # hmac_sha256签名
  81. hmac_encode_sign = hmac.new(
  82. decode_app_secret, to_encode_str, hashlib.sha256)
  83. # Base64编码
  84. base64_encode_sign = base64.b64encode(
  85. hmac_encode_sign.digest()).decode('utf-8')
  86. response = requests.post(url, json={
  87. "appId": ControlConf.aiot_appid,
  88. "timestamp": current_timestamp,
  89. "unionId": ControlConf.aiot_unionId,
  90. "userId": self.machine_id,
  91. "sign": base64_encode_sign
  92. }, timeout=3)
  93. response_json = response.json()
  94. # logger.info("1.1 获取用户密钥userSecretKey:", response_json)
  95. if response_json['code'] == 0:
  96. return True, response_json['data']['userSecret']
  97. else:
  98. logger.info("响应失败:", response_json['msg'])
  99. return False, response_json['msg']
  100. except Exception as e:
  101. logger.info(e)
  102. return False, e
  103. # 二、连接物联平台中控系统,获取access-token
  104. def _get_access_token(self):
  105. url = ControlConf.aiot_platform_host + '/cloud-api/oauth2/token'
  106. try:
  107. authorization = self._generate_authorization()
  108. request_headers = {
  109. 'Authorization': authorization,
  110. "Content-Type": "application/x-www-form-urlencoded",
  111. }
  112. password = self._generate_password()
  113. requests_data = {
  114. "grant_type": 'hmac_sign',
  115. "username": ControlConf.aiot_appid + '.' + self.machine_id,
  116. "password": password,
  117. # 权限
  118. "scope": 'cloud_write'
  119. }
  120. try:
  121. # logger.info(1111111,request_headers)
  122. # logger.info(2222222,requests_data)
  123. encoded_data = urllib.parse.urlencode(requests_data)
  124. response = requests.post(
  125. url, headers=request_headers, data=encoded_data, timeout=3)
  126. response_json = response.json()
  127. # logger.info("1.2 获取access-token:", response_json)
  128. if response_json['code'] == 0:
  129. return True, response_json['data']
  130. else:
  131. logger.info("响应失败:", response_json['msg'])
  132. return False, response_json['msg']
  133. except requests.exceptions.RequestException as e:
  134. logger.info("请求失败:", e)
  135. except Exception as e:
  136. traceback.print_exc()
  137. logger.info(e)
  138. # 生成初始Authorization
  139. def _generate_authorization(self):
  140. try:
  141. ########## 测试案例###########
  142. # md5_hex_app_secret = hashlib.md5("/uUMzm79BK0RPzI8VBgomSRjngXb5/sH".encode('utf-8')).hexdigest()
  143. # authorization = 'Basic ' + base64.b64encode(
  144. # ("NVOPaEwo8S61ytrZ" + ':' + md5_hex_app_secret).encode('utf-8')).decode('utf-8')
  145. ############################
  146. # md5 appSecret
  147. md5_hex_app_secret = hashlib.md5(
  148. ControlConf.aiot_appSecret.encode('utf-8')).hexdigest()
  149. # 生成Authorization
  150. authorization = 'Basic ' + base64.b64encode(
  151. (ControlConf.aiot_appid + ':' + md5_hex_app_secret).encode('utf-8')).decode('utf-8')
  152. return authorization
  153. except Exception as e:
  154. logger.info(e)
  155. return None
  156. # 生成password
  157. def _generate_password(self):
  158. try:
  159. appid = ControlConf.aiot_appid
  160. userid = self.machine_id
  161. secure_mode = 'hmac_sign'
  162. timestamp = int(time.time())
  163. to_encode_str = (appid + secure_mode +
  164. str(timestamp) + userid).encode('utf-8')
  165. # Base64解码userSecretKey
  166. decoded_user_secret = base64.b64decode(self.user_secretkey)
  167. # hmac_sha256签名
  168. hmac_encode_sign = hmac.new(
  169. decoded_user_secret, to_encode_str, hashlib.sha256)
  170. # Base64编码
  171. base64_encode_sign = base64.b64encode(
  172. hmac_encode_sign.digest()).decode('utf-8')
  173. # URL 编码
  174. url_encoded_sign = urllib.parse.quote(base64_encode_sign, safe='')
  175. password = f'appId={appid}&secureMode={secure_mode}&timestamp={timestamp}&userId={userid}&sign={url_encoded_sign}'
  176. return password
  177. except Exception as e:
  178. logger.info(e)
  179. traceback.print_exc()
  180. return None
  181. # 刷新access-token
  182. def refresh_access_token(self):
  183. """
  184. 如果后续接口返回401,则调用该接口刷新access-token
  185. """
  186. url = ControlConf.aiot_platform_host + '/cloud-api/oauth2/token'
  187. try:
  188. authorization = self._generate_authorization()
  189. request_headers = {
  190. 'Authorization': authorization,
  191. "Content-Type": "application/x-www-form-urlencoded",
  192. }
  193. requests_data = {
  194. "grant_type": 'refresh_token',
  195. "refresh_token": self.refresh_token
  196. }
  197. try:
  198. # logger.info(1111111,request_headers)
  199. # logger.info(2222222,requests_data)
  200. encoded_data = urllib.parse.urlencode(requests_data)
  201. response = requests.post(
  202. url, headers=request_headers, data=encoded_data, timeout=3)
  203. response_json = response.json()
  204. logger.info("1.3 刷新access-token:", response_json)
  205. if response_json['code'] == 0:
  206. self.access_token = response_json['data']['access_token']
  207. self.refresh_token = response_json['data']['refresh_token']
  208. return True, ''
  209. else:
  210. logger.info("响应失败:", response_json['msg'])
  211. return False, response_json['msg']
  212. except requests.exceptions.RequestException as e:
  213. logger.info("请求失败:", e)
  214. except Exception as e:
  215. traceback.print_exc()
  216. logger.info(e)
  217. # 连接物联平台中控系统,获取access-token
  218. def connect(self):
  219. # 一、获取用户密钥
  220. connect_flag, user_secret = self._get_platform_user_secretkey()
  221. if connect_flag is False:
  222. self.logger.error("1.1、获取用户密钥userSecretKey失败: %s" % user_secret)
  223. return
  224. else:
  225. # logger.info("1.1 成功获取用户密钥userSecretKey:%s" % str(user_secret))
  226. self.logger.info("1.1、成功获取用户密钥userSecretKey: %s" %
  227. str(user_secret))
  228. self.user_secretkey = user_secret
  229. # self.logger.info("一、获取用户密钥userSecretKey:%s"%self.user_secretkey)
  230. # 二、获取access-token和refresh-token
  231. connect_flag, access_token_data = self._get_access_token()
  232. if connect_flag is False:
  233. # self.logger.error("二、获取access-token失败:%s"%access_token_data)
  234. return
  235. else:
  236. # logger.info("1.2 成功获取access-token:%s" % str(access_token_data))
  237. self.logger.info("1.2 成功获取access-token: %s" %
  238. str(access_token_data))
  239. self.access_token = access_token_data['access_token']
  240. self.refresh_token = access_token_data['refresh_token']
  241. # self.logger.info("二、获取refresh-token:%s"%self.refresh_token)
  242. # 获取设备列表
  243. def get_device_list(self):
  244. url = ControlConf.aiot_platform_host + '/cloud-api/device/list'
  245. try:
  246. authorization = "Bearer " + self.access_token
  247. request_headers = {
  248. 'Authorization': authorization,
  249. }
  250. response = requests.get(url, headers=request_headers, timeout=3)
  251. response_json = response.json()
  252. logger.info("2.1 获取设备列表:", response_json)
  253. if response_json['code'] == 0:
  254. return True, response_json['data']
  255. elif response_json['code'] == 401:
  256. # 如果返回401,则刷新access-token
  257. self.refresh_access_token()
  258. return False, "token过期,已刷新,重新调用"
  259. else:
  260. logger.info("响应失败:", response_json['msg'])
  261. return False, response_json['msg']
  262. except requests.exceptions.RequestException as e:
  263. logger.info("请求失败:", e)
  264. # 获取产品资源
  265. def get_product_resource(self, product_id=''):
  266. url = ControlConf.aiot_platform_host + '/cloud-api/product/resource'
  267. try:
  268. authorization = "Bearer " + self.access_token
  269. request_headers = {
  270. 'Authorization': authorization,
  271. }
  272. params = {
  273. "productId": product_id
  274. }
  275. response = requests.get(
  276. url, headers=request_headers, params=params, timeout=3)
  277. response_json = response.json()
  278. logger.info("2.2 获取产品资源:", response_json)
  279. if response_json['code'] == 0:
  280. return True, response_json['data']
  281. elif response_json['code'] == 401:
  282. # 如果返回401,则刷新access-token
  283. self.refresh_access_token()
  284. return False, "token过期,已刷新,重新调用"
  285. else:
  286. logger.info("响应失败:", response_json['msg'])
  287. return False, response_json['msg']
  288. except requests.exceptions.RequestException as e:
  289. logger.info("请求失败:", e)
  290. # 控制设备
  291. def control_device(self, control_detailed_data, device_id='', product_id=''):
  292. """
  293. 控制设备
  294. :param device_id: 设备id
  295. :param product_id: 产品id
  296. :param control_detailed_data: 控制数据
  297. 示例:[{"resourceId":"property.power1","value":1},{"resourceId":"property.power2","value":1}]
  298. :return: True,''
  299. """
  300. url = ControlConf.aiot_platform_host + '/cloud-api/device/control'
  301. authorization = "Bearer " + self.access_token
  302. headers = {
  303. 'Authorization': authorization,
  304. 'Content-Type': 'application/json',
  305. }
  306. requests_data = {
  307. "productId": product_id,
  308. "deviceId": device_id,
  309. "data": control_detailed_data
  310. }
  311. def _send_request():
  312. response = requests.post(
  313. url, headers=headers, json=requests_data, timeout=3)
  314. return response.json()
  315. try:
  316. response_json = _send_request()
  317. logger.info("2.3 控制设备:", response_json)
  318. if response_json['code'] == 401:
  319. # 如果返回401,则刷新access-token
  320. self.refresh_access_token()
  321. logger.info("token过期,已刷新,重试中")
  322. response_json = _send_request()
  323. if response_json['code'] == 0:
  324. return True, response_json
  325. else:
  326. logger.info("响应失败:", response_json['msg'])
  327. return False, response_json
  328. except requests.exceptions.RequestException as e:
  329. logger.info("请求失败:", e)
  330. # 场景列表
  331. def get_scene_list(self):
  332. """
  333. :return: True,[{'acId': '338', 'name': '离家模式'}, {'acId': '389', 'name': '回家模式'}]
  334. """
  335. url = ControlConf.aiot_platform_host + '/cloud-api/scene/list'
  336. try:
  337. authorization = "Bearer " + self.access_token
  338. requests_headers = {
  339. 'Authorization': authorization,
  340. }
  341. response = requests.get(url, headers=requests_headers, timeout=3)
  342. response_json = response.json()
  343. logger.info("2.4 场景列表为:", response_json)
  344. if response_json['code'] == 0:
  345. return True, response_json['data']
  346. elif response_json['code'] == 401:
  347. # 如果返回401,则刷新access-token
  348. self.refresh_access_token()
  349. return False, "token过期,已刷新,重新调用"
  350. else:
  351. logger.info("响应失败:", response_json['msg'])
  352. return False, response_json['msg']
  353. except Exception as e:
  354. logger.info("请求失败:", e)
  355. return False, e
  356. # 对外接口,场景执行
  357. def execute_scene(self, acid_name=''):
  358. """
  359. :param acid_name: 场景名字
  360. :return: True,''
  361. """
  362. if self.scenes_id.get(acid_name, '') == '':
  363. return False, "场景不存在"
  364. acid = self.scenes_id.get(acid_name, '')
  365. url = ControlConf.aiot_platform_host + '/cloud-api/scene/control'
  366. authorization = "Bearer " + self.access_token
  367. headers = {
  368. 'Authorization': authorization,
  369. "Content-Type": "application/json",
  370. }
  371. data = {"acId": acid}
  372. def send_request():
  373. _response_json = requests.post(
  374. url, headers=headers, json=data, timeout=3)
  375. return _response_json.json()
  376. try:
  377. response_json = send_request()
  378. # logger.info("2.5 执行场景:", response_json)
  379. if response_json['code'] == 401:
  380. # 如果返回401,则刷新access-token
  381. self.refresh_access_token()
  382. # return False, "token过期,已刷新,重新调用"
  383. logger.info("token过期,已刷新,重试中...")
  384. response = send_request()
  385. response_json = response.json()
  386. if response_json['code'] == 0:
  387. return True, response_json['data']
  388. else:
  389. logger.info("响应失败:", response_json['msg'])
  390. return False, response_json['msg']
  391. except Exception as e:
  392. logger.info("请求失败:", e)
  393. return False, e
  394. # 对外接口,控制设备开关
  395. def control_device_power(self, device_name='', state=''):
  396. """
  397. 对外接口,控制设备开关
  398. :param device_name: 设备名称
  399. :param state: 开关状态
  400. :return: True,''
  401. """
  402. try:
  403. dev_dict = self.resource_info.get(device_name, '')
  404. logger.info("-----------%s-%s-----------" % (state, device_name))
  405. if dev_dict == '' or state not in ['open', 'close']:
  406. return False, "设备名称或状态不存在"
  407. else:
  408. # 控制数据
  409. control_detailed_data = [
  410. {"resourceId": dev_dict['resourceId'],
  411. "value": dev_dict[state]}
  412. ]
  413. res, res_data = self.control_device(device_id=dev_dict['deviceId'], product_id=dev_dict['productId'],
  414. control_detailed_data=control_detailed_data)
  415. return res, res_data
  416. except Exception as e:
  417. logger.info(e)
  418. return False, e
  419. def main(self, stop_event):
  420. try:
  421. logger.info("!!!!!!!!!!!可以控制家具!!!!!!!!!!!")
  422. except Exception as e:
  423. self.logger.error(f"与物联平台中控系统交互出错: {e}")
  424. aiot_controller = ControlAIotPlatform()
  425. if __name__ == "__main__":
  426. print(home_path)
  427. # 获取设备列表
  428. # aiot_controller.get_device_list()
  429. # 获取具体
  430. # res, resdata = aiot_controller.get_product_resource("RSD00005")
  431. # if res is True:
  432. # logger.info("获取资源RSD00003成功!",resdata)
  433. # else:
  434. # logger.info("获取资源RSD00003失败!",resdata)
  435. # aiot_controller.get_scene_list()
  436. # aiot_controller.execute_scene('scene_001')
  437. # 控制设备先写死测试
  438. # {
  439. # 'productId': 'KXYP79V2',
  440. # 'deviceId': '0001200d90395efffe80c4ee',
  441. # 'deviceName': '卧室灯带',
  442. # 'place': '卧室',
  443. # 'resourceId': 'power2',
  444. # 'connected': True
  445. # }
  446. # control_data = [
  447. # # {"resourceId": "power2", "value": False},
  448. # # {"resourceId": "power1", "value": False},
  449. # # 窗帘
  450. # {"resourceId": "work1.work1", "value": "2"}
  451. # ]
  452. # res, resdata = aiot_controller.control_device(device_id='ATARWSA40001B8D61AA720B0', product_id='RSD00005',
  453. # control_detailed_data=control_data)
  454. # if res is True:
  455. # print("客厅筒灯!",resdata)
  456. # else:
  457. # print("客厅筒灯!",resdata)