pyaiui.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. '''
  2. Author: zhaoyong 77912776@qq.com
  3. Date: 2025-07-02 06:34:52
  4. LastEditTime: 2025-07-19 12:24:11
  5. LastEditors: zhaoyong 77912776@qq.com
  6. FilePath: \robot_ai\src\aiui\pyaiui.py
  7. Description: aiui的python调用c++接口类
  8. '''
  9. #!/usr/bin python3
  10. # coding=utf-8
  11. import ctypes
  12. import os
  13. from .pyAIUIConstant import AIUIConstant
  14. import abc
  15. import sys
  16. import platform
  17. from pathlib import Path
  18. AIUI_DLL = None
  19. # 当前脚本路径
  20. script_path = Path(__file__).resolve()
  21. # 项目根目录(向上两级:src/aiui -> src -> 根目录)
  22. BASE_DIR = script_path.parent.parent.parent
  23. if platform.system() == "Windows":
  24. AIUI_DLL = ctypes.windll.LoadLibrary(
  25. str(BASE_DIR / "config" / "aiui" / "x64" / "aiui.dll"))
  26. else:
  27. AIUI_DLL = ctypes.cdll.LoadLibrary(
  28. str(BASE_DIR / "config" / "aiui" / "arm" / "libaiui.so"))
  29. def aiui_get_version():
  30. if AIUI_DLL is None:
  31. return "AIUI DLL not loaded"
  32. if not hasattr(AIUI_DLL, 'aiui_get_version'):
  33. return "aiui_get_version function not found in DLL"
  34. _f = AIUI_DLL.aiui_get_version
  35. _f.restype = ctypes.c_char_p
  36. s = _f()
  37. return str(s, encoding="utf-8")
  38. class IDataBundle:
  39. aiui_db = None
  40. def __init__(self, aiui_db: ctypes.c_void_p):
  41. self.aiui_db = aiui_db
  42. if AIUI_DLL is not None and hasattr(AIUI_DLL, 'aiui_db_int'):
  43. self.aiui_db_int = AIUI_DLL.aiui_db_int
  44. self.aiui_db_int.restype = ctypes.c_int
  45. self.aiui_db_int.argtypes = [
  46. ctypes.c_void_p, ctypes.c_char_p, ctypes.c_void_p]
  47. else:
  48. self.aiui_db_int = None
  49. if AIUI_DLL is not None and hasattr(AIUI_DLL, 'aiui_db_long'):
  50. self.aiui_db_long = AIUI_DLL.aiui_db_long
  51. self.aiui_db_long.restype = ctypes.c_long
  52. self.aiui_db_long.argtypes = [
  53. ctypes.c_void_p, ctypes.c_char_p, ctypes.c_void_p]
  54. else:
  55. self.aiui_db_long = None
  56. if AIUI_DLL is not None and hasattr(AIUI_DLL, 'aiui_db_string'):
  57. self.aiui_db_string = AIUI_DLL.aiui_db_string
  58. self.aiui_db_string.restype = ctypes.c_char_p
  59. self.aiui_db_string.argtypes = [
  60. ctypes.c_void_p, ctypes.c_char_p, ctypes.c_char_p]
  61. else:
  62. self.aiui_db_string = None
  63. if AIUI_DLL is not None and hasattr(AIUI_DLL, 'aiui_db_binary'):
  64. self.aiui_db_binary = AIUI_DLL.aiui_db_binary
  65. self.aiui_db_binary.restype = ctypes.c_void_p
  66. self.aiui_db_binary.argtypes = [
  67. ctypes.c_void_p, ctypes.c_char_p, ctypes.c_void_p]
  68. else:
  69. self.aiui_db_binary = None
  70. def getInt(self, key: str, defaultVal: int):
  71. if self.aiui_db_int is None:
  72. return defaultVal
  73. return int(self.aiui_db_int(self.aiui_db, ctypes.c_char_p(key.encode("utf-8")),
  74. ctypes.pointer(ctypes.c_int(defaultVal))))
  75. def getLong(self, key: str, defaultVal: int):
  76. if self.aiui_db_long is None:
  77. return defaultVal
  78. return int(self.aiui_db_long(self.aiui_db, ctypes.c_char_p(key.encode("utf-8")),
  79. ctypes.pointer(ctypes.c_long(defaultVal))))
  80. def getString(self, key: str, defaultVal: str):
  81. if self.aiui_db_string is None:
  82. return defaultVal
  83. s = self.aiui_db_string(self.aiui_db, ctypes.c_char_p(key.encode("utf-8")),
  84. ctypes.c_char_p(defaultVal.encode("utf-8")))
  85. return str(s, encoding="utf-8")
  86. def getBinary(self, key: str):
  87. if self.aiui_db_binary is None:
  88. return b""
  89. datalen = ctypes.c_int(0)
  90. s = self.aiui_db_binary(self.aiui_db, ctypes.c_char_p(
  91. key.encode("utf-8")), ctypes.pointer(datalen))
  92. ArrayType = ctypes.c_char * datalen.value
  93. pa = ctypes.cast(s, ctypes.POINTER(ArrayType))
  94. return bytes(pa.contents)
  95. def getBinaryAsStr(self, key: str):
  96. """获取二进制数据并转换为字符串,修复JSON截断问题"""
  97. try:
  98. if self.aiui_db_binary is None:
  99. return ""
  100. datalen = ctypes.c_int(0)
  101. binary = self.aiui_db_binary(self.aiui_db, ctypes.c_char_p(
  102. key.encode('utf-8')), ctypes.pointer(datalen))
  103. if datalen.value <= 0:
  104. return ""
  105. # 移除-1操作,避免截断JSON
  106. arrayType = ctypes.c_char * datalen.value
  107. pointArray = ctypes.cast(binary, ctypes.POINTER(arrayType))
  108. # 获取字节数据并转换为字符串
  109. raw_bytes = bytes(pointArray.contents)
  110. # 处理可能的空字节终止符
  111. if raw_bytes and raw_bytes[-1] == 0:
  112. raw_bytes = raw_bytes[:-1]
  113. return str(raw_bytes, encoding='utf-8')
  114. except Exception as e:
  115. # logger.info(f"getBinaryAsStr error: {e}")
  116. return ""
  117. class IAIUIEvent:
  118. aiui_event = None
  119. def __init__(self, aiui_event):
  120. self.aiui_event = aiui_event
  121. if AIUI_DLL is not None:
  122. self.aiui_event_type = AIUI_DLL.aiui_event_type
  123. self.aiui_event_type.restype = ctypes.c_int
  124. self.aiui_event_type.argtypes = [ctypes.c_void_p]
  125. self.aiui_event_arg1 = AIUI_DLL.aiui_event_arg1
  126. self.aiui_event_arg1.restype = ctypes.c_int
  127. self.aiui_event_arg1.argtypes = [ctypes.c_void_p]
  128. self.aiui_event_arg2 = AIUI_DLL.aiui_event_arg2
  129. self.aiui_event_arg2.restype = ctypes.c_int
  130. self.aiui_event_arg2.argtypes = [ctypes.c_void_p]
  131. self.aiui_event_info = AIUI_DLL.aiui_event_info
  132. self.aiui_event_info.restype = ctypes.c_char_p
  133. self.aiui_event_info.argtypes = [ctypes.c_void_p]
  134. self.aiui_event_databundle = AIUI_DLL.aiui_event_databundle
  135. self.aiui_event_databundle.restype = ctypes.c_void_p
  136. self.aiui_event_databundle.argtypes = [ctypes.c_void_p]
  137. def getEventType(self) -> int:
  138. return self.aiui_event_type(self.aiui_event)
  139. def getArg1(self) -> int:
  140. return self.aiui_event_arg1(self.aiui_event)
  141. def getArg2(self) -> int:
  142. return self.aiui_event_arg2(self.aiui_event)
  143. def getInfo(self) -> str:
  144. s = self.aiui_event_info(self.aiui_event)
  145. return str(s, encoding="utf-8")
  146. def getData(self) -> IDataBundle:
  147. db = self.aiui_event_databundle(self.aiui_event)
  148. return IDataBundle(db)
  149. class Buffer:
  150. aiui_buf = None
  151. def __init__(self, aiui_buf):
  152. self.aiui_buf = aiui_buf
  153. @staticmethod
  154. def create(dataBytearray: bytes):
  155. if AIUI_DLL is None:
  156. return None
  157. _f = AIUI_DLL.aiui_create_buffer_from_data
  158. _f.restype = ctypes.c_void_p
  159. _f.argtypes = [ctypes.c_char_p, ctypes.c_size_t]
  160. return Buffer(_f(ctypes.c_char_p(dataBytearray), ctypes.c_size_t(len(dataBytearray))))
  161. class IAIUIMessage:
  162. aiui_msg = None
  163. def __init__(self, aiui_msg):
  164. self.aiui_msg = aiui_msg
  165. @staticmethod
  166. def create(msgType: AIUIConstant, arg1=0, arg2=0, params="", data=Buffer(None)):
  167. if AIUI_DLL is None:
  168. return None
  169. _f = AIUI_DLL.aiui_msg_create
  170. _f.restype = ctypes.c_void_p
  171. _f.argtypes = [ctypes.c_int, ctypes.c_int,
  172. ctypes.c_int, ctypes.c_char_p, ctypes.c_void_p]
  173. return IAIUIMessage(
  174. _f(ctypes.c_int(msgType.value), ctypes.c_int(arg1), ctypes.c_int(arg2),
  175. ctypes.c_char_p(params.encode("utf-8")), data.aiui_buf)
  176. )
  177. def destroy(self):
  178. if AIUI_DLL is None:
  179. return None
  180. _f = AIUI_DLL.aiui_msg_destroy
  181. _f.argtypes = [ctypes.c_void_p]
  182. return _f(self.aiui_msg)
  183. class AIUIEventListener:
  184. @abc.abstractmethod
  185. def onEvent(self, ev: IAIUIEvent):
  186. pass
  187. def eventCallback(obj: AIUIEventListener):
  188. def wrapper(ev: ctypes.c_void_p, data: ctypes.c_void_p):
  189. obj.onEvent(IAIUIEvent(ev))
  190. return wrapper
  191. class IAIUIAgent:
  192. aiui_agent = None
  193. ListenerWarpper = None
  194. AIUIListenerCallback = None
  195. def __init__(self, aiui_agent):
  196. self.aiui_agent = aiui_agent
  197. if AIUI_DLL is not None:
  198. self.aiui_agent_send_message = AIUI_DLL.aiui_agent_send_message
  199. self.aiui_agent_send_message.argtypes = [
  200. ctypes.c_void_p, ctypes.c_void_p]
  201. self.aiui_agent_destroy = AIUI_DLL.aiui_agent_destroy
  202. self.aiui_agent_destroy.argtypes = [ctypes.c_void_p]
  203. else:
  204. self.aiui_agent_send_message = None
  205. self.aiui_agent_destroy = None
  206. def sendMessage(self, msg: IAIUIMessage):
  207. if self.aiui_agent_send_message is not None:
  208. return self.aiui_agent_send_message(self.aiui_agent, msg.aiui_msg)
  209. return None
  210. def destroy(self):
  211. if self.aiui_agent_destroy is not None:
  212. self.aiui_agent_destroy(self.aiui_agent)
  213. self.AIUIListenerCallback = None
  214. self.ListenerWarpper = None
  215. self.aiui_agent = None
  216. @staticmethod
  217. def createAgent(params: str, listener):
  218. if AIUI_DLL is None:
  219. return None
  220. _f = AIUI_DLL.aiui_agent_create
  221. _f.argtypes = [ctypes.c_char_p, ctypes.c_void_p, ctypes.c_void_p]
  222. _f.restype = ctypes.c_void_p
  223. agent = IAIUIAgent(None)
  224. agent.ListenerWarpper = eventCallback(listener)
  225. agent.AIUIListenerCallback = ctypes.CFUNCTYPE(
  226. None, ctypes.c_void_p, ctypes.c_void_p)(agent.ListenerWarpper)
  227. agent.aiui_agent = _f(ctypes.c_char_p(
  228. params.encode('utf-8')), agent.AIUIListenerCallback, None)
  229. return agent
  230. class AIUISetting:
  231. @staticmethod
  232. def setSystemInfo(key: str, val: str):
  233. if AIUI_DLL is None:
  234. return None
  235. _f = AIUI_DLL.aiui_set_system_info
  236. _f.argtypes = [ctypes.c_char_p, ctypes.c_char_p]
  237. return _f(ctypes.c_char_p(key.encode("utf-8")), ctypes.c_char_p(val.encode("utf-8")))
  238. @staticmethod
  239. def setMscDir(szDir: str):
  240. if AIUI_DLL is None:
  241. return False
  242. _f = AIUI_DLL.aiui_set_msc_dir
  243. _f.restype = ctypes.c_bool
  244. _f.argtypes = [ctypes.c_char_p]
  245. return _f(ctypes.c_char_p(szDir.encode('utf-8')))