| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324 |
- '''
- Author: zhaoyong 77912776@qq.com
- Date: 2025-07-02 06:34:52
- LastEditTime: 2025-07-19 12:24:11
- LastEditors: zhaoyong 77912776@qq.com
- FilePath: \robot_ai\src\aiui\pyaiui.py
- Description: aiui的python调用c++接口类
- '''
- #!/usr/bin python3
- # coding=utf-8
- import ctypes
- import os
- from .pyAIUIConstant import AIUIConstant
- import abc
- import sys
- import platform
- from pathlib import Path
- AIUI_DLL = None
- # 当前脚本路径
- script_path = Path(__file__).resolve()
- # 项目根目录(向上两级:src/aiui -> src -> 根目录)
- BASE_DIR = script_path.parent.parent.parent
- if platform.system() == "Windows":
- AIUI_DLL = ctypes.windll.LoadLibrary(
- str(BASE_DIR / "config" / "aiui" / "x64" / "aiui.dll"))
- else:
- AIUI_DLL = ctypes.cdll.LoadLibrary(
- str(BASE_DIR / "config" / "aiui" / "arm" / "libaiui.so"))
- def aiui_get_version():
- if AIUI_DLL is None:
- return "AIUI DLL not loaded"
- if not hasattr(AIUI_DLL, 'aiui_get_version'):
- return "aiui_get_version function not found in DLL"
- _f = AIUI_DLL.aiui_get_version
- _f.restype = ctypes.c_char_p
- s = _f()
- return str(s, encoding="utf-8")
- class IDataBundle:
- aiui_db = None
- def __init__(self, aiui_db: ctypes.c_void_p):
- self.aiui_db = aiui_db
- if AIUI_DLL is not None and hasattr(AIUI_DLL, 'aiui_db_int'):
- self.aiui_db_int = AIUI_DLL.aiui_db_int
- self.aiui_db_int.restype = ctypes.c_int
- self.aiui_db_int.argtypes = [
- ctypes.c_void_p, ctypes.c_char_p, ctypes.c_void_p]
- else:
- self.aiui_db_int = None
- if AIUI_DLL is not None and hasattr(AIUI_DLL, 'aiui_db_long'):
- self.aiui_db_long = AIUI_DLL.aiui_db_long
- self.aiui_db_long.restype = ctypes.c_long
- self.aiui_db_long.argtypes = [
- ctypes.c_void_p, ctypes.c_char_p, ctypes.c_void_p]
- else:
- self.aiui_db_long = None
- if AIUI_DLL is not None and hasattr(AIUI_DLL, 'aiui_db_string'):
- self.aiui_db_string = AIUI_DLL.aiui_db_string
- self.aiui_db_string.restype = ctypes.c_char_p
- self.aiui_db_string.argtypes = [
- ctypes.c_void_p, ctypes.c_char_p, ctypes.c_char_p]
- else:
- self.aiui_db_string = None
- if AIUI_DLL is not None and hasattr(AIUI_DLL, 'aiui_db_binary'):
- self.aiui_db_binary = AIUI_DLL.aiui_db_binary
- self.aiui_db_binary.restype = ctypes.c_void_p
- self.aiui_db_binary.argtypes = [
- ctypes.c_void_p, ctypes.c_char_p, ctypes.c_void_p]
- else:
- self.aiui_db_binary = None
- def getInt(self, key: str, defaultVal: int):
- if self.aiui_db_int is None:
- return defaultVal
- return int(self.aiui_db_int(self.aiui_db, ctypes.c_char_p(key.encode("utf-8")),
- ctypes.pointer(ctypes.c_int(defaultVal))))
- def getLong(self, key: str, defaultVal: int):
- if self.aiui_db_long is None:
- return defaultVal
- return int(self.aiui_db_long(self.aiui_db, ctypes.c_char_p(key.encode("utf-8")),
- ctypes.pointer(ctypes.c_long(defaultVal))))
- def getString(self, key: str, defaultVal: str):
- if self.aiui_db_string is None:
- return defaultVal
- s = self.aiui_db_string(self.aiui_db, ctypes.c_char_p(key.encode("utf-8")),
- ctypes.c_char_p(defaultVal.encode("utf-8")))
- return str(s, encoding="utf-8")
- def getBinary(self, key: str):
- if self.aiui_db_binary is None:
- return b""
- datalen = ctypes.c_int(0)
- s = self.aiui_db_binary(self.aiui_db, ctypes.c_char_p(
- key.encode("utf-8")), ctypes.pointer(datalen))
- ArrayType = ctypes.c_char * datalen.value
- pa = ctypes.cast(s, ctypes.POINTER(ArrayType))
- return bytes(pa.contents)
- def getBinaryAsStr(self, key: str):
- """获取二进制数据并转换为字符串,修复JSON截断问题"""
- try:
- if self.aiui_db_binary is None:
- return ""
- datalen = ctypes.c_int(0)
- binary = self.aiui_db_binary(self.aiui_db, ctypes.c_char_p(
- key.encode('utf-8')), ctypes.pointer(datalen))
- if datalen.value <= 0:
- return ""
- # 移除-1操作,避免截断JSON
- arrayType = ctypes.c_char * datalen.value
- pointArray = ctypes.cast(binary, ctypes.POINTER(arrayType))
- # 获取字节数据并转换为字符串
- raw_bytes = bytes(pointArray.contents)
- # 处理可能的空字节终止符
- if raw_bytes and raw_bytes[-1] == 0:
- raw_bytes = raw_bytes[:-1]
- return str(raw_bytes, encoding='utf-8')
- except Exception as e:
- # logger.info(f"getBinaryAsStr error: {e}")
- return ""
- class IAIUIEvent:
- aiui_event = None
- def __init__(self, aiui_event):
- self.aiui_event = aiui_event
- if AIUI_DLL is not None:
- self.aiui_event_type = AIUI_DLL.aiui_event_type
- self.aiui_event_type.restype = ctypes.c_int
- self.aiui_event_type.argtypes = [ctypes.c_void_p]
- self.aiui_event_arg1 = AIUI_DLL.aiui_event_arg1
- self.aiui_event_arg1.restype = ctypes.c_int
- self.aiui_event_arg1.argtypes = [ctypes.c_void_p]
- self.aiui_event_arg2 = AIUI_DLL.aiui_event_arg2
- self.aiui_event_arg2.restype = ctypes.c_int
- self.aiui_event_arg2.argtypes = [ctypes.c_void_p]
- self.aiui_event_info = AIUI_DLL.aiui_event_info
- self.aiui_event_info.restype = ctypes.c_char_p
- self.aiui_event_info.argtypes = [ctypes.c_void_p]
- self.aiui_event_databundle = AIUI_DLL.aiui_event_databundle
- self.aiui_event_databundle.restype = ctypes.c_void_p
- self.aiui_event_databundle.argtypes = [ctypes.c_void_p]
- def getEventType(self) -> int:
- return self.aiui_event_type(self.aiui_event)
- def getArg1(self) -> int:
- return self.aiui_event_arg1(self.aiui_event)
- def getArg2(self) -> int:
- return self.aiui_event_arg2(self.aiui_event)
- def getInfo(self) -> str:
- s = self.aiui_event_info(self.aiui_event)
- return str(s, encoding="utf-8")
- def getData(self) -> IDataBundle:
- db = self.aiui_event_databundle(self.aiui_event)
- return IDataBundle(db)
- class Buffer:
- aiui_buf = None
- def __init__(self, aiui_buf):
- self.aiui_buf = aiui_buf
- @staticmethod
- def create(dataBytearray: bytes):
- if AIUI_DLL is None:
- return None
- _f = AIUI_DLL.aiui_create_buffer_from_data
- _f.restype = ctypes.c_void_p
- _f.argtypes = [ctypes.c_char_p, ctypes.c_size_t]
- return Buffer(_f(ctypes.c_char_p(dataBytearray), ctypes.c_size_t(len(dataBytearray))))
- class IAIUIMessage:
- aiui_msg = None
- def __init__(self, aiui_msg):
- self.aiui_msg = aiui_msg
- @staticmethod
- def create(msgType: AIUIConstant, arg1=0, arg2=0, params="", data=Buffer(None)):
- if AIUI_DLL is None:
- return None
- _f = AIUI_DLL.aiui_msg_create
- _f.restype = ctypes.c_void_p
- _f.argtypes = [ctypes.c_int, ctypes.c_int,
- ctypes.c_int, ctypes.c_char_p, ctypes.c_void_p]
- return IAIUIMessage(
- _f(ctypes.c_int(msgType.value), ctypes.c_int(arg1), ctypes.c_int(arg2),
- ctypes.c_char_p(params.encode("utf-8")), data.aiui_buf)
- )
- def destroy(self):
- if AIUI_DLL is None:
- return None
- _f = AIUI_DLL.aiui_msg_destroy
- _f.argtypes = [ctypes.c_void_p]
- return _f(self.aiui_msg)
- class AIUIEventListener:
- @abc.abstractmethod
- def onEvent(self, ev: IAIUIEvent):
- pass
- def eventCallback(obj: AIUIEventListener):
- def wrapper(ev: ctypes.c_void_p, data: ctypes.c_void_p):
- obj.onEvent(IAIUIEvent(ev))
- return wrapper
- class IAIUIAgent:
- aiui_agent = None
- ListenerWarpper = None
- AIUIListenerCallback = None
- def __init__(self, aiui_agent):
- self.aiui_agent = aiui_agent
- if AIUI_DLL is not None:
- self.aiui_agent_send_message = AIUI_DLL.aiui_agent_send_message
- self.aiui_agent_send_message.argtypes = [
- ctypes.c_void_p, ctypes.c_void_p]
- self.aiui_agent_destroy = AIUI_DLL.aiui_agent_destroy
- self.aiui_agent_destroy.argtypes = [ctypes.c_void_p]
- else:
- self.aiui_agent_send_message = None
- self.aiui_agent_destroy = None
- def sendMessage(self, msg: IAIUIMessage):
- if self.aiui_agent_send_message is not None:
- return self.aiui_agent_send_message(self.aiui_agent, msg.aiui_msg)
- return None
- def destroy(self):
- if self.aiui_agent_destroy is not None:
- self.aiui_agent_destroy(self.aiui_agent)
- self.AIUIListenerCallback = None
- self.ListenerWarpper = None
- self.aiui_agent = None
- @staticmethod
- def createAgent(params: str, listener):
- if AIUI_DLL is None:
- return None
- _f = AIUI_DLL.aiui_agent_create
- _f.argtypes = [ctypes.c_char_p, ctypes.c_void_p, ctypes.c_void_p]
- _f.restype = ctypes.c_void_p
- agent = IAIUIAgent(None)
- agent.ListenerWarpper = eventCallback(listener)
- agent.AIUIListenerCallback = ctypes.CFUNCTYPE(
- None, ctypes.c_void_p, ctypes.c_void_p)(agent.ListenerWarpper)
- agent.aiui_agent = _f(ctypes.c_char_p(
- params.encode('utf-8')), agent.AIUIListenerCallback, None)
- return agent
- class AIUISetting:
- @staticmethod
- def setSystemInfo(key: str, val: str):
- if AIUI_DLL is None:
- return None
- _f = AIUI_DLL.aiui_set_system_info
- _f.argtypes = [ctypes.c_char_p, ctypes.c_char_p]
- return _f(ctypes.c_char_p(key.encode("utf-8")), ctypes.c_char_p(val.encode("utf-8")))
- @staticmethod
- def setMscDir(szDir: str):
- if AIUI_DLL is None:
- return False
- _f = AIUI_DLL.aiui_set_msc_dir
- _f.restype = ctypes.c_bool
- _f.argtypes = [ctypes.c_char_p]
- return _f(ctypes.c_char_p(szDir.encode('utf-8')))
|