bleSDK_expansion_board/projects/blezongkong/mdk/RS485_OTA.py

384 lines
13 KiB
Python
Raw Permalink Normal View History

import serial
import serial.tools.list_ports
import struct
import sys
import time
import os
import fnmatch
def crc16(data):
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if (crc & 0x0001) != 0:
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return crc
def create_frame(index, data):
crc = crc16(struct.pack('>H', index) + data)
frame = struct.pack('>H', index) + data + struct.pack('>H', crc)
return frame
def list_serial_ports():
ports = serial.tools.list_ports.comports()
return [port.device for port in ports]
def list_bin_files(directory):
bin_files = []
for dirpath, dirnames, filenames in os.walk(directory):
for filename in fnmatch.filter(filenames, '*.bin'):
# 使用 os.path.relpath 将路径转换为相对路径
relative_path = os.path.relpath(os.path.join(dirpath, filename), start=os.getcwd())
bin_files.append(relative_path)
return bin_files
# def list_bin_files(directory):
# bin_files = []
# for dirpath, dirnames, filenames in os.walk(directory):
# for filename in fnmatch.filter(filenames, '*.bin'):
# bin_files.append(os.path.join(dirpath, filename))
# return bin_files
def select_option(options, prompt):
for index, option in enumerate(options):
print(f"{index + 1}: {option}")
selected_index = int(input(prompt)) - 1
if selected_index < 0 or selected_index >= len(options):
print("无效的选择。")
sys.exit(1)
return options[selected_index]
def main():
# 列出可用串口
serial_ports = list_serial_ports()
if not serial_ports:
print("没有找到可用的串口。")
sys.exit(1)
print("可用串口:")
selected_port = select_option(serial_ports, "请选择串口序号: ")
# 列出当前目录及子目录下所有的bin文件
bin_files = list_bin_files(os.path.dirname(os.path.abspath(__file__)))
if not bin_files:
print("没有找到任何 .bin 文件。")
sys.exit(1)
print("找到以下 .bin 文件:")
selected_file = select_option(bin_files, "请选择文件序号: ")
baudrate = 9600 # 这里可以根据需要设置波特率
init_delay = 0.04 # 这里也可以根据需要设置延迟
COM_serial = serial.Serial(selected_port, baudrate, timeout=0)
with open(selected_file, 'rb') as f:
file_data = f.read()
time.sleep(2) # 等待串口稳定
# 发送初始化帧
chunk = file_data[8:11]
chunk = chunk.ljust(16, b'\x00')
init_frame = create_frame(0xFF01, chunk)
count = 0
ACK_OK = False
while True:
print(f"Sent start frame: {init_frame.hex().upper()}")
COM_serial.write(init_frame)
time_count = 0
while True:
response = COM_serial.read(2)
if len(response) == 2:
error_code = int.from_bytes(response, byteorder='big')
if 0xFF01 == error_code:
ACK_OK = True
print("Initialization response received: 0xFF01")
break
elif 0x5501 == error_code:
print("Firmware partition verification fails. Please check whether the selected firmware is correct.")
sys.exit(1)
elif 0x5502 <= error_code <= 0x5514:
print(f"Received error code: {error_code:04X}. Stopping file transmission.")
sys.exit(1)
time.sleep(0.001)
time_count += 1
if time_count > 1000:
count += 1
print("Initialization response not received. Retrying...")
break
if ACK_OK:
break
if count >= 3:
sys.exit(1)
index = 0
frame_size = 16
num_frames = (len(file_data) + frame_size - 1) // frame_size
# 发送数据帧
for i in range(num_frames):
start = i * frame_size
end = start + frame_size
chunk = file_data[start:end]
if len(chunk) < frame_size:
chunk = chunk.ljust(frame_size, b'\x00')
frame = create_frame(index, chunk)
COM_serial.write(frame)
if (index % 16 == 0): # 每16个数据帧delay一下
time.sleep(0.02)
# 尝试接收响应和错误码
time_count = 0
while True:
response = COM_serial.read(2)
if len(response) == 2:
error_code = int.from_bytes(response, byteorder='big')
if 0x5501 <= error_code <= 0x5514:
print(f"Received error code: {error_code:04X}. Stopping file transmission.")
sys.exit(1)
time.sleep(0.001)
time_count += 1
if time_count > (1000 * init_delay):
break
frame_crc = struct.pack('>H', crc16(struct.pack('>H', index) + chunk))
print(f"IDX:{struct.pack('>H', index).hex().upper()}|DATA:{chunk.hex().upper()}|CRC:{frame_crc.hex().upper()}")
index += 1
# 发送尾帧
end_frame_index = index - 1
end_frame_index_bytes = struct.pack('>H', end_frame_index)
end_frame_index_inverted = struct.pack('>H', ~end_frame_index & 0xFFFF)
end_frame_data = end_frame_index_bytes + end_frame_index_inverted + b'\x00' * 12
end_frame = create_frame(0xFF02, end_frame_data)
print(f"Sent end frame: {end_frame.hex().upper()}")
COM_serial.write(end_frame)
# 尝试接收尾帧响应和错误码
time_count = 0
while True:
response = COM_serial.read(2)
if len(response) == 2:
error_code = int.from_bytes(response, byteorder='big')
if 0xFF02 == error_code:
print("OTA update successfully.")
break
elif 0x5501 <= error_code <= 0x5514:
print(f"OTA update failed: error code:{error_code:04X}")
sys.exit(1)
time.sleep(0.001)
time_count += 1
if time_count > 1500:
print("OTA update failed: timeout...")
break
if __name__ == "__main__":
main()
# import serial
# import struct
# import sys
# import time
# def crc16(data):
# crc = 0xFFFF
# for byte in data:
# crc ^= byte
# for _ in range(8):
# if (crc & 0x0001) != 0:
# crc >>= 1
# crc ^= 0xA001
# else:
# crc >>= 1
# return crc
# def create_frame(index, data):
# crc = crc16(struct.pack('>H', index) + data)
# frame = struct.pack('>H', index) + data + struct.pack('>H', crc)
# return frame
# def save_frame_to_file(frame, index):
# with open('frames.txt', 'a') as f:
# f.write(f"Frame {index}: {frame.hex().upper()}\n")
# def main():
# if len(sys.argv) != 5:
# print("Usage: python serial_sender.py <port> <baudrate> <file> <float_delay>")
# sys.exit(1)
# port = sys.argv[1]
# baudrate = int(sys.argv[2])
# file_path = sys.argv[3]
# init_delay = float(sys.argv[4])
# COM_serial = serial.Serial(port, baudrate, timeout=0)
# with open(file_path, 'rb') as f:
# file_data = f.read()
# time.sleep(2) #等待串口稳定
# # 发送初始化帧
# chunk = file_data[8:11]
# chunk = chunk.ljust(16, b'\x00')
# init_frame = create_frame(0xFF01, chunk)
# # init_frame = create_frame(0xFF01, b'\x00' * 16)
# # response = COM_serial.read(2)
# # print(f"Received response: {response.hex().upper()}")
# count =0
# ACK_OK = False
# while True:
# print(f"Sent satrt frame: {init_frame.hex().upper()}")
# COM_serial.write(init_frame)
# time_count = 0
# while True:
# response = COM_serial.read(2)
# error_code = int.from_bytes(response, byteorder='big')
# if 0xFF01 == error_code:
# ACK_OK = True
# print("Initialization response received: 0xFF01")
# break
# elif 0x5501 == error_code:
# print("Firmware partition verification fails. Please check whether the selected firmware is correct.")
# sys.exit(1)
# # break
# elif 0x5502 <= error_code <= 0x5514:
# print(f"Received error code: {error_code:04X}. Stopping file transmission.")
# # sys.exit(1)
# break
# else:
# time.sleep(0.001)
# time_count += 1
# if time_count > 1000:
# count += 1
# print("Initialization response not received. Retrying...")
# break
# if ACK_OK:
# break
# if count >= 3:
# sys.exit(1)
# # save_frame_to_file(init_frame, 0)
# index = 0
# frame_size = 16
# num_frames = (len(file_data) + frame_size - 1) // frame_size
# # 发送数据帧
# for i in range(num_frames):
# start = i * frame_size
# end = start + frame_size
# chunk = file_data[start:end]
# if len(chunk) < frame_size:
# chunk = chunk.ljust(frame_size, b'\x00')
# frame = create_frame(index, chunk) # 发送数据帧
# COM_serial.write(frame)
# if (index % 16 == 0): # 每16个数据帧delay一下
# # print("delay...")
# time.sleep(0.02)
# # 尝试接收响应和错误码
# time_count = 0
# while True:
# response = COM_serial.read(2)
# error_code = int.from_bytes(response, byteorder='big')
# if 0x5501 <= error_code <= 0x5514:
# print(f"Received error code: {error_code:04X}. Stopping file transmission.")
# sys.exit(1)
# else:
# time.sleep(0.001)
# time_count += 1
# if time_count > (1000 * init_delay):
# # print("timeout...")
# break
# # response = COM_serial.read(2)
# # if response:
# # error_code = int.from_bytes(response, byteorder='big')
# # if 0x5501 <= error_code <= 0x5514:
# # print(f"Received error code: {error_code:04X}. Stopping file transmission.")
# # sys.exit(1)
# # # save_frame_to_file(frame, index)
# # time.sleep(init_delay)
# frame_crc = struct.pack('>H',crc16(struct.pack('>H', index) + chunk))
# frame_index = struct.pack('>H', index)
# print(f"IDX:{frame_index.hex().upper()}|DATA:{chunk.hex().upper()}|CRC:{frame_crc.hex().upper()}")
# index += 1
# # 发送尾帧
# end_frame_index = index - 1
# end_frame_index_bytes = struct.pack('>H', end_frame_index)
# end_frame_index_inverted = struct.pack('>H', ~end_frame_index & 0xFFFF)
# end_frame_data = end_frame_index_bytes + end_frame_index_inverted + b'\x00' * 12
# end_frame = create_frame(0xFF02, end_frame_data)
# print(f"Sent end frame: {end_frame.hex().upper()}")
# COM_serial.write(end_frame)
# # 尝试接收尾帧响应和错误码
# time_count = 0
# while True:
# response = COM_serial.read(2)
# error_code = int.from_bytes(response, byteorder='big')
# if 0xFF02 == error_code:
# print("OTA update successfully.")
# break
# elif 0x5501 <= error_code <= 0x5514:
# print(f"OTA update failed: error code:{error_code:04X}")
# sys.exit(1)
# else:
# time.sleep(0.001)
# time_count += 1
# if time_count > 1500:
# print("OTA update failed: timeout...")
# break
# # save_frame_to_file(end_frame, index)
# # end_frame_count = 0
# # while True:
# # # if COM_serial.in_waiting > 0: # 检查是否有数据可读
# # response = COM_serial.read(2)
# # if response:
# # print(f"Received response: {response.hex().upper()}")
# # error_code = int.from_bytes(response, byteorder='big')
# # if 0x5501 <= error_code <= 0x5514:
# # print(f"Received error code: {error_code:04X}. Stopping file transmission.")
# # sys.exit(1)
# # elif error_code == 0xFF02:
# # print("OTA update successfully.")
# # break
# # else:
# # end_frame_count +=1
# # if end_frame_count > 10:
# # print("End frame response not received. Stopping file transmission.")
# # sys.exit(1)
# # print("End frame response not received. Retrying...")
# if __name__ == "__main__":
# main()