aiui_process.py 899 B

123456789101112131415161718192021222324252627282930313233
  1. """
  2. AIUI消息处理策略
  3. """
  4. import zlib
  5. from utils.logger import logger
  6. from .base_strategy import ProcessStrategy
  7. class AiuiMessageProcess(ProcessStrategy):
  8. """AIUI消息处理类"""
  9. def process(self, client_socket, data: bytes) -> tuple[bool, bytes]:
  10. """
  11. 处理AIUI消息(解压缩)
  12. Args:
  13. client_socket: 客户端socket(未使用)
  14. data: 压缩的数据
  15. Returns:
  16. tuple[bool, bytes]: (成功标志, 解压后的数据)
  17. """
  18. if not data:
  19. return False, b''
  20. try:
  21. decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
  22. output = decompressor.decompress(data)
  23. output += decompressor.flush()
  24. return True, output
  25. except zlib.error as e:
  26. logger.info(f"AIUI消息解压失败: {e}")
  27. return False, b''