基于QtPy (PySide6) 的PLC-HMI工程项目(八)在上位机中解析上行报文

张开发
2026/4/16 2:33:45 15 分钟阅读

分享文章

基于QtPy (PySide6) 的PLC-HMI工程项目(八)在上位机中解析上行报文
来自PLC的上行报文具有以下数据格式帧头2字节 区域变量Area、DBnum、offset、byteCount) 本区域数据内容 byteCount长度的字节 下一区域变量 下一区域数据内容 。。。 帧尾2字节 CRR162字节数据解析流程当接收到上行报文后首先对帧头和帧尾进行校验如果校验不过就舍弃帧数据。然后经过CRC校验确保所有数据无误将帧数据保存。一、创建一个与PLC中类似的区域数据类型from typing import NamedTuple class AreaVar(NamedTuple): area: str # 数据区域I、Q、M、DB DBnum: int # DB号 offset: int # 字节偏移 byte_count: int # 字节数 content: bytes # 数据内容二、解析上行数据之前的PLC范例中获取到过一个上行帧的数据内容以这帧数据为例演示如何解析上行数据import crcmod from AreaVar import AreaVar import numpy as np # 配置区域 SERVER_IP 127.0.0.1 # 下位机服务器IP SERVER_PORT 2001 # 下位机(PLC)端口 RECONNECT_INTERVAL 3000 # 断线重连间隔毫秒 HEARTBEAT_INTERVAL 1000 # 心跳包间隔毫秒 HEART_CODE bXXYY # 心跳代码 UP_FRAME_HEAD bAA # 上行帧头 UP_FRAME_END bBB # 上行帧尾 DOWN_FRAME_HEAD bXX # 下行帧头 DOWN_FRAME_END bYY # 下行帧尾 crc16 crcmod.predefined.Crc(modbus) AREA_DICT { 0x81: I, 0x82: Q, 0x83: M, 0x84: DB } # 测试数据和 PLC 一样 data bytes( [0x41, 0x41, 0x84, 0x84, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x11, 0x22, 0x33, 0x44, 0x55, 0x42, 0x42, 0xE3, 0xA5]) def parse_up_datas(datas: bytes): 解析上行数据 :param datas: PLC发出的上行数据 :return: 自定义区域变量AreaVar的列表 if len(datas) 21: print(f❌数据校验失败) return False head datas[0:2] # 帧头 tail datas[-4:-2] # 帧尾 if head ! UP_FRAME_HEAD or tail ! UP_FRAME_END: print(f❌数据校验失败) return False # CRC crc_int int.from_bytes(datas[-2:], byteorderbig) crc16.update(datas[:-2]) crc_result crc16.crcValue if crc_result ! crc_int: print(f❌数据校验失败) return False # 截取上行数据的有效部分 up_data datas[2:-4] pos 0 # 定义一个指针用来遍历字节 AreaVars [] # 输出的变量列表 # 解析数据 while pos len(up_data): try: area AREA_DICT[up_data[pos]] except KeyError: print(f❌数据中区域代码错误) return False pos 2 DBnum int.from_bytes(up_data[pos:pos4], byteorderbig) pos 4 offset int.from_bytes(up_data[pos:pos4], byteorderbig) pos 4 byte_count int.from_bytes(up_data[pos:pos4], byteorderbig) pos 4 bytes_out up_data[pos:posbyte_count] pos byte_count AreaVars.append(AreaVar(area, DBnum, offset, byte_count, bytes_out)) if pos len(up_data): return AreaVars else: print(f❌数据长度错误解析失败) return False vas parse_up_datas(data) print(vas) # [AreaVar(areaDB, DBnum1, offset0, byte_count5, contentb\x113DU)] print(vas[0].area) # DB print(vas[0].DBnum) # 1 print(vas[0].offset) # 0 print(vas[0].byte_count) # 5 print(vas[0].content.hex()) # 1122334455三、创建类似于PLC的存储区以及读写方法import numpy as np from typing import NamedTuple class AreaVar(NamedTuple): area: str # I、Q、M、DB DBnum: int # DB号I/Q/M 填 0 offset: int # 字节偏移 byte_count: int # 字节数 content: bytesb # 数据内容 class DataStore: def __init__(self, max_db_number: int 2048, area_size: int 2048): 统一数据存储I、Q、M、DB :param max_db_number: 最大 DB 块号 :param area_size: 每个区域默认大小字节 self.area_size area_size # I / Q / M 区一维数组DBnum0 self.inputs np.zeros(area_size, dtypenp.uint8) # I区 self.outputs np.zeros(area_size, dtypenp.uint8) # Q区 self.memory np.zeros(area_size, dtypenp.uint8) # M区 # DB区二维数组 [DB号][偏移] self.db_blocks np.zeros((max_db_number 1, area_size), dtypenp.uint8) def write(self, var: AreaVar): 统一写入接口支持 I/Q/M/DB data np.frombuffer(var.content, dtypenp.uint8) start var.offset end start var.byte_count if var.area I: self.inputs[start:end] data elif var.area Q: self.outputs[start:end] data elif var.area M: self.memory[start:end] data elif var.area DB: self.db_blocks[var.DBnum, start:end] data else: raise ValueError(f不支持的区域: {var.area}) def read(self, var: AreaVar) - bytes: 统一读取接口支持 I/Q/M/DB start var.offset end start var.byte_count if var.area I: arr self.inputs[start:end] elif var.area Q: arr self.outputs[start:end] elif var.area M: arr self.memory[start:end] elif var.area DB: arr self.db_blocks[var.DBnum, start:end] else: raise ValueError(f不支持的区域: {var.area}) return arr.tobytes() # ------------------- # 测试I/Q/M/DB 全部支持 # ------------------- if __name__ __main__: store DataStore() # 测试 DB 区 db_var AreaVar(DB, 1, 0, 5, b\x113DU) store.write(db_var) res_db store.read(db_var) print(DB1 读写一致:, res_db db_var.content) # 测试 I 区 (DBnum0) i_var AreaVar(I, 0, 10, 3, babc) store.write(i_var) res_i store.read(i_var) print(I 区读写一致:, res_i i_var.content) # 测试 Q 区 q_var AreaVar(Q, 0, 20, 2, b\x01\x02) store.write(q_var) res_q store.read(q_var) print(Q 区读写一致:, res_q q_var.content) # 测试 M 区 m_var AreaVar(M, 0, 5, 4, btest) store.write(m_var) res_m store.read(m_var) print(M 区读写一致:, res_m m_var.content) # 读取 Q 区一个字节 q_var AreaVar(Q, 0, 21, 1) res_q store.read(q_var) print(fQB21读取结果:, {res_q}) # QB21读取结果:, b\x02

更多文章