85 lines
3.1 KiB
Python
85 lines
3.1 KiB
Python
import serial
|
||
import struct
|
||
import random
|
||
import time
|
||
|
||
# Адрес Modbus-устройства (slave)
|
||
SLAVE_ADDRESS = 1
|
||
|
||
# Адрес регистра для расстояния
|
||
DISTANCE_REGISTER = 0x0001
|
||
|
||
# Настройка последовательного соединения
|
||
ser = serial.Serial(
|
||
port='/dev/pts/5', # Используйте правильный порт из socat
|
||
baudrate=9600,
|
||
parity=serial.PARITY_NONE,
|
||
stopbits=serial.STOPBITS_ONE,
|
||
bytesize=serial.EIGHTBITS,
|
||
timeout=1
|
||
)
|
||
|
||
# Функция расчёта CRC16
|
||
def _calculate_crc16(data):
|
||
crc = 0xFFFF
|
||
for byte in data:
|
||
crc ^= byte
|
||
for _ in range(8):
|
||
if crc & 0x0001:
|
||
crc = (crc >> 1) ^ 0xA001
|
||
else:
|
||
crc >>= 1
|
||
return crc
|
||
|
||
# Конвертация float в формат IEEE 754 (binary32)
|
||
def float_to_ieee754(value):
|
||
return struct.pack('>f', value)
|
||
|
||
# Эмуляция ультразвукового датчика - генерация случайного расстояния
|
||
def get_emulated_distance():
|
||
return random.uniform(0.10, 4.00) # Возвращаем случайное расстояние от 0.10 м до 4.00 м
|
||
|
||
# Функция обработки запроса Modbus
|
||
def handle_read_request(request):
|
||
function_code = request[1]
|
||
register_address = (request[2] << 8) | request[3]
|
||
|
||
if function_code == 0x04 and register_address == DISTANCE_REGISTER:
|
||
distance_value = get_emulated_distance()
|
||
|
||
# Конвертируем значение в формат IEEE 754 (4 байта)
|
||
data_bytes = float_to_ieee754(distance_value)
|
||
|
||
# Формируем ответ: [адрес слейва, код функции, длина данных, данные...]
|
||
response_data = bytes([SLAVE_ADDRESS, function_code, len(data_bytes)]) + data_bytes
|
||
|
||
# Рассчитываем CRC16 для ответа
|
||
crc16 = _calculate_crc16(response_data)
|
||
crc_bytes = struct.pack('<H', crc16)
|
||
|
||
# Формируем полное сообщение и отправляем его
|
||
response_message = response_data + crc_bytes
|
||
|
||
print(f"Sent distance: {distance_value:.2f} meters")
|
||
print(f"Response message (hex): {response_message.hex()}") # Выводим сообщение в шестнадцатеричном формате
|
||
|
||
ser.write(response_message)
|
||
else:
|
||
print("Unknown function code or register address")
|
||
|
||
# Основной цикл для чтения запросов Modbus
|
||
def read_modbus_requests():
|
||
try:
|
||
while True:
|
||
# Чтение запроса (обычно длина пакета запроса Modbus — 8 байт)
|
||
request = ser.read(8)
|
||
if len(request) == 8:
|
||
print(f"Received request (hex): {request.hex()}")
|
||
handle_read_request(request)
|
||
time.sleep(0.1)
|
||
except KeyboardInterrupt:
|
||
ser.close()
|
||
|
||
# Запуск эмуляции устройства
|
||
read_modbus_requests()
|