rs-485-emulator/emulate.py
2024-10-23 10:33:21 +07:00

85 lines
3.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import serial
import struct
import random
import time
# Адрес Modbus-устройства (slave)
SLAVE_ADDRESS = 1
# Адрес регистра для расстояния
DISTANCE_REGISTER = 0x0001
# Настройка последовательного соединения
ser = serial.Serial(
port='/dev/pts/5', # Используйте правильный порт из socat
baudrate=9600,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=1
)
# Функция расчёта CRC16
def _calculate_crc16(data):
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc >>= 1
return crc
# Конвертация float в формат IEEE 754 (binary32)
def float_to_ieee754(value):
return struct.pack('>f', value)
# Эмуляция ультразвукового датчика - генерация случайного расстояния
def get_emulated_distance():
return random.uniform(0.10, 4.00) # Возвращаем случайное расстояние от 0.10 м до 4.00 м
# Функция обработки запроса Modbus
def handle_read_request(request):
function_code = request[1]
register_address = (request[2] << 8) | request[3]
if function_code == 0x04 and register_address == DISTANCE_REGISTER:
distance_value = get_emulated_distance()
# Конвертируем значение в формат IEEE 754 (4 байта)
data_bytes = float_to_ieee754(distance_value)
# Формируем ответ: [адрес слейва, код функции, длина данных, данные...]
response_data = bytes([SLAVE_ADDRESS, function_code, len(data_bytes)]) + data_bytes
# Рассчитываем CRC16 для ответа
crc16 = _calculate_crc16(response_data)
crc_bytes = struct.pack('<H', crc16)
# Формируем полное сообщение и отправляем его
response_message = response_data + crc_bytes
print(f"Sent distance: {distance_value:.2f} meters")
print(f"Response message (hex): {response_message.hex()}") # Выводим сообщение в шестнадцатеричном формате
ser.write(response_message)
else:
print("Unknown function code or register address")
# Основной цикл для чтения запросов Modbus
def read_modbus_requests():
try:
while True:
# Чтение запроса (обычно длина пакета запроса Modbus — 8 байт)
request = ser.read(8)
if len(request) == 8:
print(f"Received request (hex): {request.hex()}")
handle_read_request(request)
time.sleep(0.1)
except KeyboardInterrupt:
ser.close()
# Запуск эмуляции устройства
read_modbus_requests()