1#!/usr/bin/env python 2# 3# otatool is used to perform ota-level operations - flashing ota partition 4# erasing ota partition and switching ota partition 5# 6# Copyright 2018 Espressif Systems (Shanghai) PTE LTD 7# 8# Licensed under the Apache License, Version 2.0 (the "License"); 9# you may not use this file except in compliance with the License. 10# You may obtain a copy of the License at 11# 12# http:#www.apache.org/licenses/LICENSE-2.0 13# 14# Unless required by applicable law or agreed to in writing, software 15# distributed under the License is distributed on an "AS IS" BASIS, 16# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17# See the License for the specific language governing permissions and 18# limitations under the License. 19from __future__ import division, print_function 20 21import argparse 22import binascii 23import collections 24import os 25import struct 26import sys 27import tempfile 28 29try: 30 from parttool import PARTITION_TABLE_OFFSET, PartitionName, PartitionType, ParttoolTarget 31except ImportError: 32 COMPONENTS_PATH = os.path.expandvars(os.path.join('$IDF_PATH', 'components')) 33 PARTTOOL_DIR = os.path.join(COMPONENTS_PATH, 'partition_table') 34 sys.path.append(PARTTOOL_DIR) 35 from parttool import PARTITION_TABLE_OFFSET, PartitionName, PartitionType, ParttoolTarget 36 37__version__ = '2.0' 38 39SPI_FLASH_SEC_SIZE = 0x2000 40 41quiet = False 42 43 44def status(msg): 45 if not quiet: 46 print(msg) 47 48 49class OtatoolTarget(): 50 51 OTADATA_PARTITION = PartitionType('data', 'ota') 52 53 def __init__(self, port=None, baud=None, partition_table_offset=PARTITION_TABLE_OFFSET, partition_table_file=None, 54 spi_flash_sec_size=SPI_FLASH_SEC_SIZE, esptool_args=[], esptool_write_args=[], 55 esptool_read_args=[], esptool_erase_args=[]): 56 self.target = ParttoolTarget(port, baud, partition_table_offset, partition_table_file, esptool_args, 57 esptool_write_args, esptool_read_args, esptool_erase_args) 58 self.spi_flash_sec_size = spi_flash_sec_size 59 60 temp_file = tempfile.NamedTemporaryFile(delete=False) 61 temp_file.close() 62 try: 63 self.target.read_partition(OtatoolTarget.OTADATA_PARTITION, temp_file.name) 64 with open(temp_file.name, 'rb') as f: 65 self.otadata = f.read() 66 finally: 67 os.unlink(temp_file.name) 68 69 def _check_otadata_partition(self): 70 if not self.otadata: 71 raise Exception('No otadata partition found') 72 73 def erase_otadata(self): 74 self._check_otadata_partition() 75 self.target.erase_partition(OtatoolTarget.OTADATA_PARTITION) 76 77 def _get_otadata_info(self): 78 info = [] 79 80 otadata_info = collections.namedtuple('otadata_info', 'seq crc') 81 82 for i in range(2): 83 start = i * (self.spi_flash_sec_size >> 1) 84 85 seq = bytearray(self.otadata[start:start + 4]) 86 crc = bytearray(self.otadata[start + 28:start + 32]) 87 88 seq = struct.unpack('I', seq) 89 crc = struct.unpack('I', crc) 90 info.append(otadata_info(seq[0], crc[0])) 91 92 return info 93 94 def _get_partition_id_from_ota_id(self, ota_id): 95 if isinstance(ota_id, int): 96 return PartitionType('app', 'ota_' + str(ota_id)) 97 else: 98 return PartitionName(ota_id) 99 100 def switch_ota_partition(self, ota_id): 101 self._check_otadata_partition() 102 103 import gen_esp32part as gen 104 105 def is_otadata_info_valid(status): 106 seq = status.seq % (1 << 32) 107 crc = binascii.crc32(struct.pack('I', seq), 0xFFFFFFFF) % (1 << 32) 108 return seq < (int('0xFFFFFFFF', 16) % (1 << 32)) and status.crc == crc 109 110 partition_table = self.target.partition_table 111 112 ota_partitions = list() 113 114 for i in range(gen.NUM_PARTITION_SUBTYPE_APP_OTA): 115 ota_partition = filter(lambda p: p.subtype == (gen.MIN_PARTITION_SUBTYPE_APP_OTA + i), partition_table) 116 117 try: 118 ota_partitions.append(list(ota_partition)[0]) 119 except IndexError: 120 break 121 122 ota_partitions = sorted(ota_partitions, key=lambda p: p.subtype) 123 124 if not ota_partitions: 125 raise Exception('No ota app partitions found') 126 127 # Look for the app partition to switch to 128 ota_partition_next = None 129 130 try: 131 if isinstance(ota_id, int): 132 ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA == ota_id, ota_partitions) 133 else: 134 ota_partition_next = filter(lambda p: p.name == ota_id, ota_partitions) 135 136 ota_partition_next = list(ota_partition_next)[0] 137 except IndexError: 138 raise Exception('Partition to switch to not found') 139 140 otadata_info = self._get_otadata_info() 141 142 # Find the copy to base the computation for ota sequence number on 143 otadata_compute_base = -1 144 145 # Both are valid, take the max as computation base 146 if is_otadata_info_valid(otadata_info[0]) and is_otadata_info_valid(otadata_info[1]): 147 if otadata_info[0].seq >= otadata_info[1].seq: 148 otadata_compute_base = 0 149 else: 150 otadata_compute_base = 1 151 # Only one copy is valid, use that 152 elif is_otadata_info_valid(otadata_info[0]): 153 otadata_compute_base = 0 154 elif is_otadata_info_valid(otadata_info[1]): 155 otadata_compute_base = 1 156 # Both are invalid (could be initial state - all 0xFF's) 157 else: 158 pass 159 160 ota_seq_next = 0 161 ota_partitions_num = len(ota_partitions) 162 163 target_seq = (ota_partition_next.subtype & 0x0F) + 1 164 165 # Find the next ota sequence number 166 if otadata_compute_base == 0 or otadata_compute_base == 1: 167 base_seq = otadata_info[otadata_compute_base].seq % (1 << 32) 168 169 i = 0 170 while base_seq > target_seq % ota_partitions_num + i * ota_partitions_num: 171 i += 1 172 173 ota_seq_next = target_seq % ota_partitions_num + i * ota_partitions_num 174 else: 175 ota_seq_next = target_seq 176 177 # Create binary data from computed values 178 ota_seq_next = struct.pack('I', ota_seq_next) 179 ota_seq_crc_next = binascii.crc32(ota_seq_next, 0xFFFFFFFF) % (1 << 32) 180 ota_seq_crc_next = struct.pack('I', ota_seq_crc_next) 181 182 temp_file = tempfile.NamedTemporaryFile(delete=False) 183 temp_file.close() 184 185 try: 186 with open(temp_file.name, 'wb') as otadata_next_file: 187 start = (1 if otadata_compute_base == 0 else 0) * (self.spi_flash_sec_size >> 1) 188 189 otadata_next_file.write(self.otadata) 190 191 otadata_next_file.seek(start) 192 otadata_next_file.write(ota_seq_next) 193 194 otadata_next_file.seek(start + 28) 195 otadata_next_file.write(ota_seq_crc_next) 196 197 otadata_next_file.flush() 198 199 self.target.write_partition(OtatoolTarget.OTADATA_PARTITION, temp_file.name) 200 finally: 201 os.unlink(temp_file.name) 202 203 def read_ota_partition(self, ota_id, output): 204 self.target.read_partition(self._get_partition_id_from_ota_id(ota_id), output) 205 206 def write_ota_partition(self, ota_id, input): 207 self.target.write_partition(self._get_partition_id_from_ota_id(ota_id), input) 208 209 def erase_ota_partition(self, ota_id): 210 self.target.erase_partition(self._get_partition_id_from_ota_id(ota_id)) 211 212 213def _read_otadata(target): 214 target._check_otadata_partition() 215 216 otadata_info = target._get_otadata_info() 217 218 print(' {:8s} \t {:8s} | \t {:8s} \t {:8s}'.format('OTA_SEQ', 'CRC', 'OTA_SEQ', 'CRC')) 219 print('Firmware: 0x{:08x} \t0x{:08x} | \t0x{:08x} \t 0x{:08x}'.format(otadata_info[0].seq, otadata_info[0].crc, 220 otadata_info[1].seq, otadata_info[1].crc)) 221 222 223def _erase_otadata(target): 224 target.erase_otadata() 225 status('Erased ota_data partition contents') 226 227 228def _switch_ota_partition(target, ota_id): 229 target.switch_ota_partition(ota_id) 230 231 232def _read_ota_partition(target, ota_id, output): 233 target.read_ota_partition(ota_id, output) 234 status('Read ota partition contents to file {}'.format(output)) 235 236 237def _write_ota_partition(target, ota_id, input): 238 target.write_ota_partition(ota_id, input) 239 status('Written contents of file {} to ota partition'.format(input)) 240 241 242def _erase_ota_partition(target, ota_id): 243 target.erase_ota_partition(ota_id) 244 status('Erased contents of ota partition') 245 246 247def main(): 248 if sys.version_info[0] < 3: 249 print('WARNING: Support for Python 2 is deprecated and will be removed in future versions.', file=sys.stderr) 250 elif sys.version_info[0] == 3 and sys.version_info[1] < 6: 251 print('WARNING: Python 3 versions older than 3.6 are not supported.', file=sys.stderr) 252 global quiet 253 254 parser = argparse.ArgumentParser('ESP-IDF OTA Partitions Tool') 255 256 parser.add_argument('--quiet', '-q', help='suppress stderr messages', action='store_true') 257 parser.add_argument('--esptool-args', help='additional main arguments for esptool', nargs='+') 258 parser.add_argument('--esptool-write-args', help='additional subcommand arguments for esptool write_flash', nargs='+') 259 parser.add_argument('--esptool-read-args', help='additional subcommand arguments for esptool read_flash', nargs='+') 260 parser.add_argument('--esptool-erase-args', help='additional subcommand arguments for esptool erase_region', nargs='+') 261 262 # There are two possible sources for the partition table: a device attached to the host 263 # or a partition table CSV/binary file. These sources are mutually exclusive. 264 parser.add_argument('--port', '-p', help='port where the device to read the partition table from is attached') 265 266 parser.add_argument('--baud', '-b', help='baudrate to use', type=int) 267 268 parser.add_argument('--partition-table-offset', '-o', help='offset to read the partition table from', type=str) 269 270 parser.add_argument('--partition-table-file', '-f', help='file (CSV/binary) to read the partition table from; \ 271 overrides device attached to specified port as the partition table source when defined') 272 273 subparsers = parser.add_subparsers(dest='operation', help='run otatool -h for additional help') 274 275 spi_flash_sec_size = argparse.ArgumentParser(add_help=False) 276 spi_flash_sec_size.add_argument('--spi-flash-sec-size', help='value of SPI_FLASH_SEC_SIZE macro', type=str) 277 278 # Specify the supported operations 279 subparsers.add_parser('read_otadata', help='read otadata partition', parents=[spi_flash_sec_size]) 280 subparsers.add_parser('erase_otadata', help='erase otadata partition') 281 282 slot_or_name_parser = argparse.ArgumentParser(add_help=False) 283 slot_or_name_parser_args = slot_or_name_parser.add_mutually_exclusive_group() 284 slot_or_name_parser_args.add_argument('--slot', help='slot number of the ota partition', type=int) 285 slot_or_name_parser_args.add_argument('--name', help='name of the ota partition') 286 287 subparsers.add_parser('switch_ota_partition', help='switch otadata partition', parents=[slot_or_name_parser, spi_flash_sec_size]) 288 289 read_ota_partition_subparser = subparsers.add_parser('read_ota_partition', help='read contents of an ota partition', parents=[slot_or_name_parser]) 290 read_ota_partition_subparser.add_argument('--output', help='file to write the contents of the ota partition to', required=True) 291 292 write_ota_partition_subparser = subparsers.add_parser('write_ota_partition', help='write contents to an ota partition', parents=[slot_or_name_parser]) 293 write_ota_partition_subparser.add_argument('--input', help='file whose contents to write to the ota partition') 294 295 subparsers.add_parser('erase_ota_partition', help='erase contents of an ota partition', parents=[slot_or_name_parser]) 296 297 args = parser.parse_args() 298 299 quiet = args.quiet 300 301 # No operation specified, display help and exit 302 if args.operation is None: 303 if not quiet: 304 parser.print_help() 305 sys.exit(1) 306 307 target_args = {} 308 309 if args.port: 310 target_args['port'] = args.port 311 312 if args.partition_table_file: 313 target_args['partition_table_file'] = args.partition_table_file 314 315 if args.partition_table_offset: 316 target_args['partition_table_offset'] = int(args.partition_table_offset, 0) 317 318 try: 319 if args.spi_flash_sec_size: 320 target_args['spi_flash_sec_size'] = int(args.spi_flash_sec_size, 0) 321 except AttributeError: 322 pass 323 324 if args.esptool_args: 325 target_args['esptool_args'] = args.esptool_args 326 327 if args.esptool_write_args: 328 target_args['esptool_write_args'] = args.esptool_write_args 329 330 if args.esptool_read_args: 331 target_args['esptool_read_args'] = args.esptool_read_args 332 333 if args.esptool_erase_args: 334 target_args['esptool_erase_args'] = args.esptool_erase_args 335 336 if args.baud: 337 target_args['baud'] = args.baud 338 339 target = OtatoolTarget(**target_args) 340 341 # Create the operation table and execute the operation 342 common_args = {'target':target} 343 344 ota_id = [] 345 346 try: 347 if args.name is not None: 348 ota_id = ['name'] 349 else: 350 if args.slot is not None: 351 ota_id = ['slot'] 352 except AttributeError: 353 pass 354 355 otatool_ops = { 356 'read_otadata':(_read_otadata, []), 357 'erase_otadata':(_erase_otadata, []), 358 'switch_ota_partition':(_switch_ota_partition, ota_id), 359 'read_ota_partition':(_read_ota_partition, ['output'] + ota_id), 360 'write_ota_partition':(_write_ota_partition, ['input'] + ota_id), 361 'erase_ota_partition':(_erase_ota_partition, ota_id) 362 } 363 364 (op, op_args) = otatool_ops[args.operation] 365 366 for op_arg in op_args: 367 common_args.update({op_arg:vars(args)[op_arg]}) 368 369 try: 370 common_args['ota_id'] = common_args.pop('name') 371 except KeyError: 372 try: 373 common_args['ota_id'] = common_args.pop('slot') 374 except KeyError: 375 pass 376 377 if quiet: 378 # If exceptions occur, suppress and exit quietly 379 try: 380 op(**common_args) 381 except Exception: 382 sys.exit(2) 383 else: 384 op(**common_args) 385 386 387if __name__ == '__main__': 388 main() 389