import serial import serial.tools.list_ports import struct import sys import time import os import fnmatch def crc16(data): crc = 0xFFFF for byte in data: crc ^= byte for _ in range(8): if (crc & 0x0001) != 0: crc >>= 1 crc ^= 0xA001 else: crc >>= 1 return crc def create_frame(index, data): crc = crc16(struct.pack('>H', index) + data) frame = struct.pack('>H', index) + data + struct.pack('>H', crc) return frame def list_serial_ports(): ports = serial.tools.list_ports.comports() return [port.device for port in ports] def list_bin_files(directory): bin_files = [] for dirpath, dirnames, filenames in os.walk(directory): for filename in fnmatch.filter(filenames, '*.bin'): # 使用 os.path.relpath 将路径转换为相对路径 relative_path = os.path.relpath(os.path.join(dirpath, filename), start=os.getcwd()) bin_files.append(relative_path) return bin_files # def list_bin_files(directory): # bin_files = [] # for dirpath, dirnames, filenames in os.walk(directory): # for filename in fnmatch.filter(filenames, '*.bin'): # bin_files.append(os.path.join(dirpath, filename)) # return bin_files def select_option(options, prompt): for index, option in enumerate(options): print(f"{index + 1}: {option}") selected_index = int(input(prompt)) - 1 if selected_index < 0 or selected_index >= len(options): print("无效的选择。") sys.exit(1) return options[selected_index] def main(): # 列出可用串口 serial_ports = list_serial_ports() if not serial_ports: print("没有找到可用的串口。") sys.exit(1) print("可用串口:") selected_port = select_option(serial_ports, "请选择串口序号: ") # 列出当前目录及子目录下所有的bin文件 bin_files = list_bin_files(os.path.dirname(os.path.abspath(__file__))) if not bin_files: print("没有找到任何 .bin 文件。") sys.exit(1) print("找到以下 .bin 文件:") selected_file = select_option(bin_files, "请选择文件序号: ") baudrate = 9600 # 这里可以根据需要设置波特率 init_delay = 0.04 # 这里也可以根据需要设置延迟 COM_serial = serial.Serial(selected_port, baudrate, timeout=0) with open(selected_file, 'rb') as f: file_data = f.read() time.sleep(2) # 等待串口稳定 # 发送初始化帧 chunk = file_data[8:11] chunk = chunk.ljust(16, b'\x00') init_frame = create_frame(0xFF01, chunk) count = 0 ACK_OK = False while True: print(f"Sent start frame: {init_frame.hex().upper()}") COM_serial.write(init_frame) time_count = 0 while True: response = COM_serial.read(2) if len(response) == 2: error_code = int.from_bytes(response, byteorder='big') if 0xFF01 == error_code: ACK_OK = True print("Initialization response received: 0xFF01") break elif 0x5501 == error_code: print("Firmware partition verification fails. Please check whether the selected firmware is correct.") sys.exit(1) elif 0x5502 <= error_code <= 0x5514: print(f"Received error code: {error_code:04X}. Stopping file transmission.") sys.exit(1) time.sleep(0.001) time_count += 1 if time_count > 1000: count += 1 print("Initialization response not received. Retrying...") break if ACK_OK: break if count >= 3: sys.exit(1) index = 0 frame_size = 16 num_frames = (len(file_data) + frame_size - 1) // frame_size # 发送数据帧 for i in range(num_frames): start = i * frame_size end = start + frame_size chunk = file_data[start:end] if len(chunk) < frame_size: chunk = chunk.ljust(frame_size, b'\x00') frame = create_frame(index, chunk) COM_serial.write(frame) if (index % 16 == 0): # 每16个数据帧delay一下 time.sleep(0.02) # 尝试接收响应和错误码 time_count = 0 while True: response = COM_serial.read(2) if len(response) == 2: error_code = int.from_bytes(response, byteorder='big') if 0x5501 <= error_code <= 0x5514: print(f"Received error code: {error_code:04X}. Stopping file transmission.") sys.exit(1) time.sleep(0.001) time_count += 1 if time_count > (1000 * init_delay): break frame_crc = struct.pack('>H', crc16(struct.pack('>H', index) + chunk)) print(f"IDX:{struct.pack('>H', index).hex().upper()}|DATA:{chunk.hex().upper()}|CRC:{frame_crc.hex().upper()}") index += 1 # 发送尾帧 end_frame_index = index - 1 end_frame_index_bytes = struct.pack('>H', end_frame_index) end_frame_index_inverted = struct.pack('>H', ~end_frame_index & 0xFFFF) end_frame_data = end_frame_index_bytes + end_frame_index_inverted + b'\x00' * 12 end_frame = create_frame(0xFF02, end_frame_data) print(f"Sent end frame: {end_frame.hex().upper()}") COM_serial.write(end_frame) # 尝试接收尾帧响应和错误码 time_count = 0 while True: response = COM_serial.read(2) if len(response) == 2: error_code = int.from_bytes(response, byteorder='big') if 0xFF02 == error_code: print("OTA update successfully.") break elif 0x5501 <= error_code <= 0x5514: print(f"OTA update failed: error code:{error_code:04X}") sys.exit(1) time.sleep(0.001) time_count += 1 if time_count > 1500: print("OTA update failed: timeout...") break if __name__ == "__main__": main() # import serial # import struct # import sys # import time # def crc16(data): # crc = 0xFFFF # for byte in data: # crc ^= byte # for _ in range(8): # if (crc & 0x0001) != 0: # crc >>= 1 # crc ^= 0xA001 # else: # crc >>= 1 # return crc # def create_frame(index, data): # crc = crc16(struct.pack('>H', index) + data) # frame = struct.pack('>H', index) + data + struct.pack('>H', crc) # return frame # def save_frame_to_file(frame, index): # with open('frames.txt', 'a') as f: # f.write(f"Frame {index}: {frame.hex().upper()}\n") # def main(): # if len(sys.argv) != 5: # print("Usage: python serial_sender.py ") # sys.exit(1) # port = sys.argv[1] # baudrate = int(sys.argv[2]) # file_path = sys.argv[3] # init_delay = float(sys.argv[4]) # COM_serial = serial.Serial(port, baudrate, timeout=0) # with open(file_path, 'rb') as f: # file_data = f.read() # time.sleep(2) #等待串口稳定 # # 发送初始化帧 # chunk = file_data[8:11] # chunk = chunk.ljust(16, b'\x00') # init_frame = create_frame(0xFF01, chunk) # # init_frame = create_frame(0xFF01, b'\x00' * 16) # # response = COM_serial.read(2) # # print(f"Received response: {response.hex().upper()}") # count =0 # ACK_OK = False # while True: # print(f"Sent satrt frame: {init_frame.hex().upper()}") # COM_serial.write(init_frame) # time_count = 0 # while True: # response = COM_serial.read(2) # error_code = int.from_bytes(response, byteorder='big') # if 0xFF01 == error_code: # ACK_OK = True # print("Initialization response received: 0xFF01") # break # elif 0x5501 == error_code: # print("Firmware partition verification fails. Please check whether the selected firmware is correct.") # sys.exit(1) # # break # elif 0x5502 <= error_code <= 0x5514: # print(f"Received error code: {error_code:04X}. Stopping file transmission.") # # sys.exit(1) # break # else: # time.sleep(0.001) # time_count += 1 # if time_count > 1000: # count += 1 # print("Initialization response not received. Retrying...") # break # if ACK_OK: # break # if count >= 3: # sys.exit(1) # # save_frame_to_file(init_frame, 0) # index = 0 # frame_size = 16 # num_frames = (len(file_data) + frame_size - 1) // frame_size # # 发送数据帧 # for i in range(num_frames): # start = i * frame_size # end = start + frame_size # chunk = file_data[start:end] # if len(chunk) < frame_size: # chunk = chunk.ljust(frame_size, b'\x00') # frame = create_frame(index, chunk) # 发送数据帧 # COM_serial.write(frame) # if (index % 16 == 0): # 每16个数据帧delay一下 # # print("delay...") # time.sleep(0.02) # # 尝试接收响应和错误码 # time_count = 0 # while True: # response = COM_serial.read(2) # error_code = int.from_bytes(response, byteorder='big') # if 0x5501 <= error_code <= 0x5514: # print(f"Received error code: {error_code:04X}. Stopping file transmission.") # sys.exit(1) # else: # time.sleep(0.001) # time_count += 1 # if time_count > (1000 * init_delay): # # print("timeout...") # break # # response = COM_serial.read(2) # # if response: # # error_code = int.from_bytes(response, byteorder='big') # # if 0x5501 <= error_code <= 0x5514: # # print(f"Received error code: {error_code:04X}. Stopping file transmission.") # # sys.exit(1) # # # save_frame_to_file(frame, index) # # time.sleep(init_delay) # frame_crc = struct.pack('>H',crc16(struct.pack('>H', index) + chunk)) # frame_index = struct.pack('>H', index) # print(f"IDX:{frame_index.hex().upper()}|DATA:{chunk.hex().upper()}|CRC:{frame_crc.hex().upper()}") # index += 1 # # 发送尾帧 # end_frame_index = index - 1 # end_frame_index_bytes = struct.pack('>H', end_frame_index) # end_frame_index_inverted = struct.pack('>H', ~end_frame_index & 0xFFFF) # end_frame_data = end_frame_index_bytes + end_frame_index_inverted + b'\x00' * 12 # end_frame = create_frame(0xFF02, end_frame_data) # print(f"Sent end frame: {end_frame.hex().upper()}") # COM_serial.write(end_frame) # # 尝试接收尾帧响应和错误码 # time_count = 0 # while True: # response = COM_serial.read(2) # error_code = int.from_bytes(response, byteorder='big') # if 0xFF02 == error_code: # print("OTA update successfully.") # break # elif 0x5501 <= error_code <= 0x5514: # print(f"OTA update failed: error code:{error_code:04X}") # sys.exit(1) # else: # time.sleep(0.001) # time_count += 1 # if time_count > 1500: # print("OTA update failed: timeout...") # break # # save_frame_to_file(end_frame, index) # # end_frame_count = 0 # # while True: # # # if COM_serial.in_waiting > 0: # 检查是否有数据可读 # # response = COM_serial.read(2) # # if response: # # print(f"Received response: {response.hex().upper()}") # # error_code = int.from_bytes(response, byteorder='big') # # if 0x5501 <= error_code <= 0x5514: # # print(f"Received error code: {error_code:04X}. Stopping file transmission.") # # sys.exit(1) # # elif error_code == 0xFF02: # # print("OTA update successfully.") # # break # # else: # # end_frame_count +=1 # # if end_frame_count > 10: # # print("End frame response not received. Stopping file transmission.") # # sys.exit(1) # # print("End frame response not received. Retrying...") # if __name__ == "__main__": # main()