emulator v1
This commit is contained in:
commit
2fe1ddee70
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
.venv
|
35
Readme.md
Normal file
35
Readme.md
Normal file
@ -0,0 +1,35 @@
|
||||
|
||||
|
||||
#### Эмулятор rs-485 по modbus для linux
|
||||
|
||||
0. Создаем и активируем виртуальное окружение
|
||||
```
|
||||
python -m venv .venv
|
||||
```
|
||||
|
||||
1. Перед запуском устанавливаем зависимости:
|
||||
```
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
2. Монтируем виртуальные порты: (смонтированы будут как /dev/pts/*, где * - номер)
|
||||
```
|
||||
socat -d -d pty,raw,echo=0 pty,raw,echo=0
|
||||
```
|
||||
|
||||
3. Настраиваем эмулятор
|
||||
```
|
||||
# Настройка последовательного соединения
|
||||
ser = serial.Serial(
|
||||
port='/dev/pts/5', # Используйте правильный порт из socat
|
||||
baudrate=9600, # Скорость обмена
|
||||
parity=serial.PARITY_NONE, #
|
||||
stopbits=serial.STOPBITS_ONE, # Бит остановки
|
||||
bytesize=serial.EIGHTBITS, # Размер сообщения
|
||||
timeout=1 # Таймаут
|
||||
)
|
||||
```
|
||||
4. Запуск:
|
||||
```
|
||||
python main.py
|
||||
```
|
84
emulate.py
Normal file
84
emulate.py
Normal file
@ -0,0 +1,84 @@
|
||||
import serial
|
||||
import struct
|
||||
import random
|
||||
import time
|
||||
|
||||
# Адрес Modbus-устройства (slave)
|
||||
SLAVE_ADDRESS = 1
|
||||
|
||||
# Адрес регистра для расстояния
|
||||
DISTANCE_REGISTER = 0x0001
|
||||
|
||||
# Настройка последовательного соединения
|
||||
ser = serial.Serial(
|
||||
port='/dev/pts/5', # Используйте правильный порт из socat
|
||||
baudrate=9600,
|
||||
parity=serial.PARITY_NONE,
|
||||
stopbits=serial.STOPBITS_ONE,
|
||||
bytesize=serial.EIGHTBITS,
|
||||
timeout=1
|
||||
)
|
||||
|
||||
# Функция расчёта CRC16
|
||||
def _calculate_crc16(data):
|
||||
crc = 0xFFFF
|
||||
for byte in data:
|
||||
crc ^= byte
|
||||
for _ in range(8):
|
||||
if crc & 0x0001:
|
||||
crc = (crc >> 1) ^ 0xA001
|
||||
else:
|
||||
crc >>= 1
|
||||
return crc
|
||||
|
||||
# Конвертация float в формат IEEE 754 (binary32)
|
||||
def float_to_ieee754(value):
|
||||
return struct.pack('>f', value)
|
||||
|
||||
# Эмуляция ультразвукового датчика - генерация случайного расстояния
|
||||
def get_emulated_distance():
|
||||
return random.uniform(0.10, 4.00) # Возвращаем случайное расстояние от 0.10 м до 4.00 м
|
||||
|
||||
# Функция обработки запроса Modbus
|
||||
def handle_read_request(request):
|
||||
function_code = request[1]
|
||||
register_address = (request[2] << 8) | request[3]
|
||||
|
||||
if function_code == 0x04 and register_address == DISTANCE_REGISTER:
|
||||
distance_value = get_emulated_distance()
|
||||
|
||||
# Конвертируем значение в формат IEEE 754 (4 байта)
|
||||
data_bytes = float_to_ieee754(distance_value)
|
||||
|
||||
# Формируем ответ: [адрес слейва, код функции, длина данных, данные...]
|
||||
response_data = bytes([SLAVE_ADDRESS, function_code, len(data_bytes)]) + data_bytes
|
||||
|
||||
# Рассчитываем CRC16 для ответа
|
||||
crc16 = _calculate_crc16(response_data)
|
||||
crc_bytes = struct.pack('<H', crc16)
|
||||
|
||||
# Формируем полное сообщение и отправляем его
|
||||
response_message = response_data + crc_bytes
|
||||
|
||||
print(f"Sent distance: {distance_value:.2f} meters")
|
||||
print(f"Response message (hex): {response_message.hex()}") # Выводим сообщение в шестнадцатеричном формате
|
||||
|
||||
ser.write(response_message)
|
||||
else:
|
||||
print("Unknown function code or register address")
|
||||
|
||||
# Основной цикл для чтения запросов Modbus
|
||||
def read_modbus_requests():
|
||||
try:
|
||||
while True:
|
||||
# Чтение запроса (обычно длина пакета запроса Modbus — 8 байт)
|
||||
request = ser.read(8)
|
||||
if len(request) == 8:
|
||||
print(f"Received request (hex): {request.hex()}")
|
||||
handle_read_request(request)
|
||||
time.sleep(0.1)
|
||||
except KeyboardInterrupt:
|
||||
ser.close()
|
||||
|
||||
# Запуск эмуляции устройства
|
||||
read_modbus_requests()
|
2
requirements.txt
Normal file
2
requirements.txt
Normal file
@ -0,0 +1,2 @@
|
||||
pymodbus==3.7.4
|
||||
pyserial==3.5
|
41
test.py
Normal file
41
test.py
Normal file
@ -0,0 +1,41 @@
|
||||
import serial
|
||||
import struct
|
||||
|
||||
# Подключение к виртуальному порту (например, /dev/pts/6)
|
||||
ser = serial.Serial('/dev/pts/6', 9600, timeout=1)
|
||||
|
||||
# Пример запроса Modbus (функция 0x04, чтение регистра 0x0001)
|
||||
request = bytes([1, 4, 0x00, 0x01, 0x00, 0x01, 0x31, 0xCA])
|
||||
|
||||
# Отправка запроса
|
||||
ser.write(request)
|
||||
|
||||
# Чтение ответа (Минимум 7 байт: 1 байт адрес, 1 байт код функции, 1 байт количество данных, 4 байта данных, 2 байта CRC)
|
||||
response = ser.read(9) # Ожидаем 9 байт (учитывая длину данных)
|
||||
|
||||
if len(response) >= 9:
|
||||
print(f'Response (hex): {response.hex()}')
|
||||
|
||||
# Распарсим ответ
|
||||
slave_address = response[0] # Адрес устройства
|
||||
function_code = response[1] # Код функции
|
||||
byte_count = response[2] # Количество байт данных
|
||||
|
||||
# Получаем 4 байта данных (float в IEEE 754)
|
||||
data_bytes = response[3:3+4]
|
||||
crc_received = struct.unpack('<H', response[7:9])[0] # CRC16 (2 байта)
|
||||
|
||||
# Конвертируем данные из формата IEEE 754 в float
|
||||
distance_value = struct.unpack('>f', data_bytes)[0]
|
||||
|
||||
# Вывод распарсенной информации
|
||||
print(f"Slave Address: {slave_address}")
|
||||
print(f"Function Code: {function_code}")
|
||||
print(f"Byte Count: {byte_count}")
|
||||
print(f"Distance Value (meters): {distance_value:.2f}")
|
||||
print(f"CRC Received: {hex(crc_received)}")
|
||||
|
||||
else:
|
||||
print("Incomplete response received")
|
||||
|
||||
ser.close()
|
Loading…
Reference in New Issue
Block a user