from pymodbus.client.tcp import ModbusTcpClient from pymodbus.exceptions import ModbusException import struct def read_register_float_value(host: str, port: int, address: list[int]): """ 从modbus 读取数据 :param port: 端口 :param host: 地址 :param address: 减去40000的偏移量 :return: float值 """ result_dict = {} client = ModbusTcpClient(host=host, port=port) try: # 连接到 Modbus 设备 if client.connect(): for add in address: # 从起始地址为 40002 的寄存器读取数据(Modbus 地址从 0 开始,所以 40003 对应 40002) result = client.read_holding_registers(address=add, count=2) # 读取 2 个寄存器,因为一个 float 占用 2 个寄存器 if not result.isError(): # 获取寄存器值列表 register_values = result.registers # 将寄存器值转换为字节表示 packed_bytes = bytes() for value in register_values: packed_bytes += value.to_bytes(2, byteorder='big') # 将字节表示转换为浮点数 float_value = struct.unpack('>f', packed_bytes)[0] # 使用大端序 result_dict[add] = float_value else: print("读取数据失败:", result) return False, None return True, result_dict else: print("无法连接到 Modbus 设备") return False, None except ModbusException as e: print("发生 Modbus 异常:", e) return False, None finally: # 关闭连接 client.close() def write_register(host: str, port: int, address: int, value): """ 写数据 :param host: 地址 :param port: 端口 :param address: 地址 :param value: 值 :return: """ client = ModbusTcpClient(host=host, port=port) try: # 连接到 Modbus 设备 if client.connect(): # 将浮点数转换为字节表示 packed_float = struct.pack('>f', value) # 将字节转换为 16 位整数列表,因为每个寄存器是 16 位 registers = [int.from_bytes(packed_float[i:i + 2], byteorder='big', signed=False) for i in range(0, len(packed_float), 2)] # 向起始地址为 40000 的寄存器写入数据(Modbus 地址从 0 开始,所以 40001 对应 40000) result = client.write_registers(address=address, values=registers) if not result.isError(): return True, "浮点数写入成功" else: return False, "浮点数写入失败:" + result else: return False, "无法连接到 Modbus 设备" except ModbusException as e: return False, "发生 Modbus 异常:" + e finally: # 关闭连接 client.close()