384 lines
13 KiB
Python
384 lines
13 KiB
Python
|
import serial
|
||
|
import serial.tools.list_ports
|
||
|
import struct
|
||
|
import sys
|
||
|
import time
|
||
|
import os
|
||
|
import fnmatch
|
||
|
|
||
|
def crc16(data):
|
||
|
crc = 0xFFFF
|
||
|
for byte in data:
|
||
|
crc ^= byte
|
||
|
for _ in range(8):
|
||
|
if (crc & 0x0001) != 0:
|
||
|
crc >>= 1
|
||
|
crc ^= 0xA001
|
||
|
else:
|
||
|
crc >>= 1
|
||
|
return crc
|
||
|
|
||
|
def create_frame(index, data):
|
||
|
crc = crc16(struct.pack('>H', index) + data)
|
||
|
frame = struct.pack('>H', index) + data + struct.pack('>H', crc)
|
||
|
return frame
|
||
|
|
||
|
def list_serial_ports():
|
||
|
ports = serial.tools.list_ports.comports()
|
||
|
return [port.device for port in ports]
|
||
|
|
||
|
def list_bin_files(directory):
|
||
|
bin_files = []
|
||
|
for dirpath, dirnames, filenames in os.walk(directory):
|
||
|
for filename in fnmatch.filter(filenames, '*.bin'):
|
||
|
# 使用 os.path.relpath 将路径转换为相对路径
|
||
|
relative_path = os.path.relpath(os.path.join(dirpath, filename), start=os.getcwd())
|
||
|
bin_files.append(relative_path)
|
||
|
return bin_files
|
||
|
|
||
|
# def list_bin_files(directory):
|
||
|
# bin_files = []
|
||
|
# for dirpath, dirnames, filenames in os.walk(directory):
|
||
|
# for filename in fnmatch.filter(filenames, '*.bin'):
|
||
|
# bin_files.append(os.path.join(dirpath, filename))
|
||
|
# return bin_files
|
||
|
|
||
|
def select_option(options, prompt):
|
||
|
for index, option in enumerate(options):
|
||
|
print(f"{index + 1}: {option}")
|
||
|
selected_index = int(input(prompt)) - 1
|
||
|
if selected_index < 0 or selected_index >= len(options):
|
||
|
print("无效的选择。")
|
||
|
sys.exit(1)
|
||
|
return options[selected_index]
|
||
|
|
||
|
def main():
|
||
|
# 列出可用串口
|
||
|
serial_ports = list_serial_ports()
|
||
|
if not serial_ports:
|
||
|
print("没有找到可用的串口。")
|
||
|
sys.exit(1)
|
||
|
|
||
|
print("可用串口:")
|
||
|
selected_port = select_option(serial_ports, "请选择串口序号: ")
|
||
|
|
||
|
# 列出当前目录及子目录下所有的bin文件
|
||
|
bin_files = list_bin_files(os.path.dirname(os.path.abspath(__file__)))
|
||
|
if not bin_files:
|
||
|
print("没有找到任何 .bin 文件。")
|
||
|
sys.exit(1)
|
||
|
|
||
|
print("找到以下 .bin 文件:")
|
||
|
selected_file = select_option(bin_files, "请选择文件序号: ")
|
||
|
|
||
|
baudrate = 9600 # 这里可以根据需要设置波特率
|
||
|
init_delay = 0.04 # 这里也可以根据需要设置延迟
|
||
|
|
||
|
COM_serial = serial.Serial(selected_port, baudrate, timeout=0)
|
||
|
|
||
|
with open(selected_file, 'rb') as f:
|
||
|
file_data = f.read()
|
||
|
|
||
|
time.sleep(2) # 等待串口稳定
|
||
|
|
||
|
# 发送初始化帧
|
||
|
chunk = file_data[8:11]
|
||
|
chunk = chunk.ljust(16, b'\x00')
|
||
|
init_frame = create_frame(0xFF01, chunk)
|
||
|
|
||
|
count = 0
|
||
|
ACK_OK = False
|
||
|
while True:
|
||
|
print(f"Sent start frame: {init_frame.hex().upper()}")
|
||
|
COM_serial.write(init_frame)
|
||
|
time_count = 0
|
||
|
while True:
|
||
|
response = COM_serial.read(2)
|
||
|
if len(response) == 2:
|
||
|
error_code = int.from_bytes(response, byteorder='big')
|
||
|
if 0xFF01 == error_code:
|
||
|
ACK_OK = True
|
||
|
print("Initialization response received: 0xFF01")
|
||
|
break
|
||
|
elif 0x5501 == error_code:
|
||
|
print("Firmware partition verification fails. Please check whether the selected firmware is correct.")
|
||
|
sys.exit(1)
|
||
|
elif 0x5502 <= error_code <= 0x5514:
|
||
|
print(f"Received error code: {error_code:04X}. Stopping file transmission.")
|
||
|
sys.exit(1)
|
||
|
time.sleep(0.001)
|
||
|
time_count += 1
|
||
|
if time_count > 1000:
|
||
|
count += 1
|
||
|
print("Initialization response not received. Retrying...")
|
||
|
break
|
||
|
if ACK_OK:
|
||
|
break
|
||
|
if count >= 3:
|
||
|
sys.exit(1)
|
||
|
|
||
|
index = 0
|
||
|
frame_size = 16
|
||
|
num_frames = (len(file_data) + frame_size - 1) // frame_size
|
||
|
|
||
|
# 发送数据帧
|
||
|
for i in range(num_frames):
|
||
|
start = i * frame_size
|
||
|
end = start + frame_size
|
||
|
chunk = file_data[start:end]
|
||
|
|
||
|
if len(chunk) < frame_size:
|
||
|
chunk = chunk.ljust(frame_size, b'\x00')
|
||
|
|
||
|
frame = create_frame(index, chunk)
|
||
|
COM_serial.write(frame)
|
||
|
if (index % 16 == 0): # 每16个数据帧delay一下
|
||
|
time.sleep(0.02)
|
||
|
|
||
|
# 尝试接收响应和错误码
|
||
|
time_count = 0
|
||
|
while True:
|
||
|
response = COM_serial.read(2)
|
||
|
if len(response) == 2:
|
||
|
error_code = int.from_bytes(response, byteorder='big')
|
||
|
if 0x5501 <= error_code <= 0x5514:
|
||
|
print(f"Received error code: {error_code:04X}. Stopping file transmission.")
|
||
|
sys.exit(1)
|
||
|
time.sleep(0.001)
|
||
|
time_count += 1
|
||
|
if time_count > (1000 * init_delay):
|
||
|
break
|
||
|
|
||
|
frame_crc = struct.pack('>H', crc16(struct.pack('>H', index) + chunk))
|
||
|
print(f"IDX:{struct.pack('>H', index).hex().upper()}|DATA:{chunk.hex().upper()}|CRC:{frame_crc.hex().upper()}")
|
||
|
index += 1
|
||
|
|
||
|
# 发送尾帧
|
||
|
end_frame_index = index - 1
|
||
|
end_frame_index_bytes = struct.pack('>H', end_frame_index)
|
||
|
end_frame_index_inverted = struct.pack('>H', ~end_frame_index & 0xFFFF)
|
||
|
end_frame_data = end_frame_index_bytes + end_frame_index_inverted + b'\x00' * 12
|
||
|
end_frame = create_frame(0xFF02, end_frame_data)
|
||
|
|
||
|
print(f"Sent end frame: {end_frame.hex().upper()}")
|
||
|
COM_serial.write(end_frame)
|
||
|
|
||
|
# 尝试接收尾帧响应和错误码
|
||
|
time_count = 0
|
||
|
while True:
|
||
|
response = COM_serial.read(2)
|
||
|
if len(response) == 2:
|
||
|
error_code = int.from_bytes(response, byteorder='big')
|
||
|
if 0xFF02 == error_code:
|
||
|
print("OTA update successfully.")
|
||
|
break
|
||
|
elif 0x5501 <= error_code <= 0x5514:
|
||
|
print(f"OTA update failed: error code:{error_code:04X}")
|
||
|
sys.exit(1)
|
||
|
time.sleep(0.001)
|
||
|
time_count += 1
|
||
|
if time_count > 1500:
|
||
|
print("OTA update failed: timeout...")
|
||
|
break
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
|
||
|
# import serial
|
||
|
# import struct
|
||
|
# import sys
|
||
|
# import time
|
||
|
|
||
|
# def crc16(data):
|
||
|
# crc = 0xFFFF
|
||
|
# for byte in data:
|
||
|
# crc ^= byte
|
||
|
# for _ in range(8):
|
||
|
# if (crc & 0x0001) != 0:
|
||
|
# crc >>= 1
|
||
|
# crc ^= 0xA001
|
||
|
# else:
|
||
|
# crc >>= 1
|
||
|
# return crc
|
||
|
|
||
|
# def create_frame(index, data):
|
||
|
# crc = crc16(struct.pack('>H', index) + data)
|
||
|
# frame = struct.pack('>H', index) + data + struct.pack('>H', crc)
|
||
|
# return frame
|
||
|
|
||
|
# def save_frame_to_file(frame, index):
|
||
|
# with open('frames.txt', 'a') as f:
|
||
|
# f.write(f"Frame {index}: {frame.hex().upper()}\n")
|
||
|
|
||
|
# def main():
|
||
|
# if len(sys.argv) != 5:
|
||
|
# print("Usage: python serial_sender.py <port> <baudrate> <file> <float_delay>")
|
||
|
# sys.exit(1)
|
||
|
|
||
|
# port = sys.argv[1]
|
||
|
# baudrate = int(sys.argv[2])
|
||
|
# file_path = sys.argv[3]
|
||
|
# init_delay = float(sys.argv[4])
|
||
|
|
||
|
# COM_serial = serial.Serial(port, baudrate, timeout=0)
|
||
|
|
||
|
# with open(file_path, 'rb') as f:
|
||
|
# file_data = f.read()
|
||
|
|
||
|
# time.sleep(2) #等待串口稳定
|
||
|
|
||
|
|
||
|
# # 发送初始化帧
|
||
|
# chunk = file_data[8:11]
|
||
|
# chunk = chunk.ljust(16, b'\x00')
|
||
|
# init_frame = create_frame(0xFF01, chunk)
|
||
|
|
||
|
# # init_frame = create_frame(0xFF01, b'\x00' * 16)
|
||
|
|
||
|
# # response = COM_serial.read(2)
|
||
|
# # print(f"Received response: {response.hex().upper()}")
|
||
|
# count =0
|
||
|
# ACK_OK = False
|
||
|
# while True:
|
||
|
# print(f"Sent satrt frame: {init_frame.hex().upper()}")
|
||
|
# COM_serial.write(init_frame)
|
||
|
# time_count = 0
|
||
|
# while True:
|
||
|
# response = COM_serial.read(2)
|
||
|
# error_code = int.from_bytes(response, byteorder='big')
|
||
|
# if 0xFF01 == error_code:
|
||
|
# ACK_OK = True
|
||
|
# print("Initialization response received: 0xFF01")
|
||
|
# break
|
||
|
# elif 0x5501 == error_code:
|
||
|
# print("Firmware partition verification fails. Please check whether the selected firmware is correct.")
|
||
|
# sys.exit(1)
|
||
|
# # break
|
||
|
# elif 0x5502 <= error_code <= 0x5514:
|
||
|
# print(f"Received error code: {error_code:04X}. Stopping file transmission.")
|
||
|
# # sys.exit(1)
|
||
|
# break
|
||
|
# else:
|
||
|
# time.sleep(0.001)
|
||
|
# time_count += 1
|
||
|
# if time_count > 1000:
|
||
|
# count += 1
|
||
|
# print("Initialization response not received. Retrying...")
|
||
|
# break
|
||
|
# if ACK_OK:
|
||
|
# break
|
||
|
# if count >= 3:
|
||
|
# sys.exit(1)
|
||
|
|
||
|
# # save_frame_to_file(init_frame, 0)
|
||
|
|
||
|
# index = 0
|
||
|
# frame_size = 16
|
||
|
# num_frames = (len(file_data) + frame_size - 1) // frame_size
|
||
|
|
||
|
# # 发送数据帧
|
||
|
# for i in range(num_frames):
|
||
|
# start = i * frame_size
|
||
|
# end = start + frame_size
|
||
|
# chunk = file_data[start:end]
|
||
|
|
||
|
# if len(chunk) < frame_size:
|
||
|
# chunk = chunk.ljust(frame_size, b'\x00')
|
||
|
|
||
|
# frame = create_frame(index, chunk) # 发送数据帧
|
||
|
# COM_serial.write(frame)
|
||
|
# if (index % 16 == 0): # 每16个数据帧delay一下
|
||
|
# # print("delay...")
|
||
|
# time.sleep(0.02)
|
||
|
# # 尝试接收响应和错误码
|
||
|
# time_count = 0
|
||
|
# while True:
|
||
|
# response = COM_serial.read(2)
|
||
|
# error_code = int.from_bytes(response, byteorder='big')
|
||
|
# if 0x5501 <= error_code <= 0x5514:
|
||
|
# print(f"Received error code: {error_code:04X}. Stopping file transmission.")
|
||
|
# sys.exit(1)
|
||
|
# else:
|
||
|
# time.sleep(0.001)
|
||
|
# time_count += 1
|
||
|
# if time_count > (1000 * init_delay):
|
||
|
# # print("timeout...")
|
||
|
# break
|
||
|
|
||
|
# # response = COM_serial.read(2)
|
||
|
# # if response:
|
||
|
# # error_code = int.from_bytes(response, byteorder='big')
|
||
|
# # if 0x5501 <= error_code <= 0x5514:
|
||
|
# # print(f"Received error code: {error_code:04X}. Stopping file transmission.")
|
||
|
# # sys.exit(1)
|
||
|
# # # save_frame_to_file(frame, index)
|
||
|
# # time.sleep(init_delay)
|
||
|
|
||
|
# frame_crc = struct.pack('>H',crc16(struct.pack('>H', index) + chunk))
|
||
|
# frame_index = struct.pack('>H', index)
|
||
|
# print(f"IDX:{frame_index.hex().upper()}|DATA:{chunk.hex().upper()}|CRC:{frame_crc.hex().upper()}")
|
||
|
# index += 1
|
||
|
|
||
|
|
||
|
|
||
|
# # 发送尾帧
|
||
|
# end_frame_index = index - 1
|
||
|
# end_frame_index_bytes = struct.pack('>H', end_frame_index)
|
||
|
# end_frame_index_inverted = struct.pack('>H', ~end_frame_index & 0xFFFF)
|
||
|
# end_frame_data = end_frame_index_bytes + end_frame_index_inverted + b'\x00' * 12
|
||
|
# end_frame = create_frame(0xFF02, end_frame_data)
|
||
|
|
||
|
# print(f"Sent end frame: {end_frame.hex().upper()}")
|
||
|
# COM_serial.write(end_frame)
|
||
|
# # 尝试接收尾帧响应和错误码
|
||
|
# time_count = 0
|
||
|
# while True:
|
||
|
# response = COM_serial.read(2)
|
||
|
# error_code = int.from_bytes(response, byteorder='big')
|
||
|
# if 0xFF02 == error_code:
|
||
|
# print("OTA update successfully.")
|
||
|
# break
|
||
|
# elif 0x5501 <= error_code <= 0x5514:
|
||
|
# print(f"OTA update failed: error code:{error_code:04X}")
|
||
|
# sys.exit(1)
|
||
|
# else:
|
||
|
# time.sleep(0.001)
|
||
|
# time_count += 1
|
||
|
# if time_count > 1500:
|
||
|
# print("OTA update failed: timeout...")
|
||
|
# break
|
||
|
# # save_frame_to_file(end_frame, index)
|
||
|
|
||
|
|
||
|
# # end_frame_count = 0
|
||
|
# # while True:
|
||
|
# # # if COM_serial.in_waiting > 0: # 检查是否有数据可读
|
||
|
# # response = COM_serial.read(2)
|
||
|
# # if response:
|
||
|
# # print(f"Received response: {response.hex().upper()}")
|
||
|
# # error_code = int.from_bytes(response, byteorder='big')
|
||
|
# # if 0x5501 <= error_code <= 0x5514:
|
||
|
# # print(f"Received error code: {error_code:04X}. Stopping file transmission.")
|
||
|
# # sys.exit(1)
|
||
|
# # elif error_code == 0xFF02:
|
||
|
# # print("OTA update successfully.")
|
||
|
# # break
|
||
|
# # else:
|
||
|
# # end_frame_count +=1
|
||
|
# # if end_frame_count > 10:
|
||
|
# # print("End frame response not received. Stopping file transmission.")
|
||
|
# # sys.exit(1)
|
||
|
# # print("End frame response not received. Retrying...")
|
||
|
|
||
|
# if __name__ == "__main__":
|
||
|
# main()
|