• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2#
3# otatool is used to perform ota-level operations - flashing ota partition
4# erasing ota partition and switching ota partition
5#
6# Copyright 2018 Espressif Systems (Shanghai) PTE LTD
7#
8# Licensed under the Apache License, Version 2.0 (the "License");
9# you may not use this file except in compliance with the License.
10# You may obtain a copy of the License at
11#
12#     http:#www.apache.org/licenses/LICENSE-2.0
13#
14# Unless required by applicable law or agreed to in writing, software
15# distributed under the License is distributed on an "AS IS" BASIS,
16# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17# See the License for the specific language governing permissions and
18# limitations under the License.
19from __future__ import division, print_function
20
21import argparse
22import binascii
23import collections
24import os
25import struct
26import sys
27import tempfile
28
29try:
30    from parttool import PARTITION_TABLE_OFFSET, PartitionName, PartitionType, ParttoolTarget
31except ImportError:
32    COMPONENTS_PATH = os.path.expandvars(os.path.join('$IDF_PATH', 'components'))
33    PARTTOOL_DIR = os.path.join(COMPONENTS_PATH, 'partition_table')
34    sys.path.append(PARTTOOL_DIR)
35    from parttool import PARTITION_TABLE_OFFSET, PartitionName, PartitionType, ParttoolTarget
36
37__version__ = '2.0'
38
39SPI_FLASH_SEC_SIZE = 0x2000
40
41quiet = False
42
43
44def status(msg):
45    if not quiet:
46        print(msg)
47
48
49class OtatoolTarget():
50
51    OTADATA_PARTITION = PartitionType('data', 'ota')
52
53    def __init__(self, port=None, baud=None, partition_table_offset=PARTITION_TABLE_OFFSET, partition_table_file=None,
54                 spi_flash_sec_size=SPI_FLASH_SEC_SIZE, esptool_args=[], esptool_write_args=[],
55                 esptool_read_args=[], esptool_erase_args=[]):
56        self.target = ParttoolTarget(port, baud, partition_table_offset, partition_table_file, esptool_args,
57                                     esptool_write_args, esptool_read_args, esptool_erase_args)
58        self.spi_flash_sec_size = spi_flash_sec_size
59
60        temp_file = tempfile.NamedTemporaryFile(delete=False)
61        temp_file.close()
62        try:
63            self.target.read_partition(OtatoolTarget.OTADATA_PARTITION, temp_file.name)
64            with open(temp_file.name, 'rb') as f:
65                self.otadata = f.read()
66        finally:
67            os.unlink(temp_file.name)
68
69    def _check_otadata_partition(self):
70        if not self.otadata:
71            raise Exception('No otadata partition found')
72
73    def erase_otadata(self):
74        self._check_otadata_partition()
75        self.target.erase_partition(OtatoolTarget.OTADATA_PARTITION)
76
77    def _get_otadata_info(self):
78        info = []
79
80        otadata_info = collections.namedtuple('otadata_info', 'seq crc')
81
82        for i in range(2):
83            start = i * (self.spi_flash_sec_size >> 1)
84
85            seq = bytearray(self.otadata[start:start + 4])
86            crc = bytearray(self.otadata[start + 28:start + 32])
87
88            seq = struct.unpack('I', seq)
89            crc = struct.unpack('I', crc)
90            info.append(otadata_info(seq[0], crc[0]))
91
92        return info
93
94    def _get_partition_id_from_ota_id(self, ota_id):
95        if isinstance(ota_id, int):
96            return PartitionType('app', 'ota_' + str(ota_id))
97        else:
98            return PartitionName(ota_id)
99
100    def switch_ota_partition(self, ota_id):
101        self._check_otadata_partition()
102
103        import gen_esp32part as gen
104
105        def is_otadata_info_valid(status):
106            seq = status.seq % (1 << 32)
107            crc = binascii.crc32(struct.pack('I', seq), 0xFFFFFFFF) % (1 << 32)
108            return seq < (int('0xFFFFFFFF', 16) % (1 << 32)) and status.crc == crc
109
110        partition_table = self.target.partition_table
111
112        ota_partitions = list()
113
114        for i in range(gen.NUM_PARTITION_SUBTYPE_APP_OTA):
115            ota_partition = filter(lambda p: p.subtype == (gen.MIN_PARTITION_SUBTYPE_APP_OTA + i), partition_table)
116
117            try:
118                ota_partitions.append(list(ota_partition)[0])
119            except IndexError:
120                break
121
122        ota_partitions = sorted(ota_partitions, key=lambda p: p.subtype)
123
124        if not ota_partitions:
125            raise Exception('No ota app partitions found')
126
127        # Look for the app partition to switch to
128        ota_partition_next = None
129
130        try:
131            if isinstance(ota_id, int):
132                ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA  == ota_id, ota_partitions)
133            else:
134                ota_partition_next = filter(lambda p: p.name == ota_id, ota_partitions)
135
136            ota_partition_next = list(ota_partition_next)[0]
137        except IndexError:
138            raise Exception('Partition to switch to not found')
139
140        otadata_info = self._get_otadata_info()
141
142        # Find the copy to base the computation for ota sequence number on
143        otadata_compute_base = -1
144
145        # Both are valid, take the max as computation base
146        if is_otadata_info_valid(otadata_info[0]) and is_otadata_info_valid(otadata_info[1]):
147            if otadata_info[0].seq >= otadata_info[1].seq:
148                otadata_compute_base = 0
149            else:
150                otadata_compute_base = 1
151        # Only one copy is valid, use that
152        elif is_otadata_info_valid(otadata_info[0]):
153            otadata_compute_base = 0
154        elif is_otadata_info_valid(otadata_info[1]):
155            otadata_compute_base = 1
156        # Both are invalid (could be initial state - all 0xFF's)
157        else:
158            pass
159
160        ota_seq_next = 0
161        ota_partitions_num = len(ota_partitions)
162
163        target_seq = (ota_partition_next.subtype & 0x0F) + 1
164
165        # Find the next ota sequence number
166        if otadata_compute_base == 0 or otadata_compute_base == 1:
167            base_seq = otadata_info[otadata_compute_base].seq % (1 << 32)
168
169            i = 0
170            while base_seq > target_seq % ota_partitions_num + i * ota_partitions_num:
171                i += 1
172
173            ota_seq_next = target_seq % ota_partitions_num + i * ota_partitions_num
174        else:
175            ota_seq_next = target_seq
176
177        # Create binary data from computed values
178        ota_seq_next = struct.pack('I', ota_seq_next)
179        ota_seq_crc_next = binascii.crc32(ota_seq_next, 0xFFFFFFFF) % (1 << 32)
180        ota_seq_crc_next = struct.pack('I', ota_seq_crc_next)
181
182        temp_file = tempfile.NamedTemporaryFile(delete=False)
183        temp_file.close()
184
185        try:
186            with open(temp_file.name, 'wb') as otadata_next_file:
187                start = (1 if otadata_compute_base == 0 else 0) * (self.spi_flash_sec_size >> 1)
188
189                otadata_next_file.write(self.otadata)
190
191                otadata_next_file.seek(start)
192                otadata_next_file.write(ota_seq_next)
193
194                otadata_next_file.seek(start + 28)
195                otadata_next_file.write(ota_seq_crc_next)
196
197                otadata_next_file.flush()
198
199            self.target.write_partition(OtatoolTarget.OTADATA_PARTITION, temp_file.name)
200        finally:
201            os.unlink(temp_file.name)
202
203    def read_ota_partition(self, ota_id, output):
204        self.target.read_partition(self._get_partition_id_from_ota_id(ota_id), output)
205
206    def write_ota_partition(self, ota_id, input):
207        self.target.write_partition(self._get_partition_id_from_ota_id(ota_id), input)
208
209    def erase_ota_partition(self, ota_id):
210        self.target.erase_partition(self._get_partition_id_from_ota_id(ota_id))
211
212
213def _read_otadata(target):
214    target._check_otadata_partition()
215
216    otadata_info = target._get_otadata_info()
217
218    print('             {:8s} \t  {:8s} | \t  {:8s} \t   {:8s}'.format('OTA_SEQ', 'CRC', 'OTA_SEQ', 'CRC'))
219    print('Firmware:  0x{:08x} \t0x{:08x} | \t0x{:08x} \t 0x{:08x}'.format(otadata_info[0].seq, otadata_info[0].crc,
220          otadata_info[1].seq, otadata_info[1].crc))
221
222
223def _erase_otadata(target):
224    target.erase_otadata()
225    status('Erased ota_data partition contents')
226
227
228def _switch_ota_partition(target, ota_id):
229    target.switch_ota_partition(ota_id)
230
231
232def _read_ota_partition(target, ota_id, output):
233    target.read_ota_partition(ota_id, output)
234    status('Read ota partition contents to file {}'.format(output))
235
236
237def _write_ota_partition(target, ota_id, input):
238    target.write_ota_partition(ota_id, input)
239    status('Written contents of file {} to ota partition'.format(input))
240
241
242def _erase_ota_partition(target, ota_id):
243    target.erase_ota_partition(ota_id)
244    status('Erased contents of ota partition')
245
246
247def main():
248    if sys.version_info[0] < 3:
249        print('WARNING: Support for Python 2 is deprecated and will be removed in future versions.', file=sys.stderr)
250    elif sys.version_info[0] == 3 and sys.version_info[1] < 6:
251        print('WARNING: Python 3 versions older than 3.6 are not supported.', file=sys.stderr)
252    global quiet
253
254    parser = argparse.ArgumentParser('ESP-IDF OTA Partitions Tool')
255
256    parser.add_argument('--quiet', '-q', help='suppress stderr messages', action='store_true')
257    parser.add_argument('--esptool-args', help='additional main arguments for esptool', nargs='+')
258    parser.add_argument('--esptool-write-args', help='additional subcommand arguments for esptool write_flash', nargs='+')
259    parser.add_argument('--esptool-read-args', help='additional subcommand arguments for esptool read_flash', nargs='+')
260    parser.add_argument('--esptool-erase-args', help='additional subcommand arguments for esptool erase_region', nargs='+')
261
262    # There are two possible sources for the partition table: a device attached to the host
263    # or a partition table CSV/binary file. These sources are mutually exclusive.
264    parser.add_argument('--port', '-p', help='port where the device to read the partition table from is attached')
265
266    parser.add_argument('--baud', '-b', help='baudrate to use', type=int)
267
268    parser.add_argument('--partition-table-offset', '-o', help='offset to read the partition table from',  type=str)
269
270    parser.add_argument('--partition-table-file', '-f', help='file (CSV/binary) to read the partition table from; \
271                                                            overrides device attached to specified port as the partition table source when defined')
272
273    subparsers = parser.add_subparsers(dest='operation', help='run otatool -h for additional help')
274
275    spi_flash_sec_size = argparse.ArgumentParser(add_help=False)
276    spi_flash_sec_size.add_argument('--spi-flash-sec-size', help='value of SPI_FLASH_SEC_SIZE macro', type=str)
277
278    # Specify the supported operations
279    subparsers.add_parser('read_otadata', help='read otadata partition', parents=[spi_flash_sec_size])
280    subparsers.add_parser('erase_otadata', help='erase otadata partition')
281
282    slot_or_name_parser = argparse.ArgumentParser(add_help=False)
283    slot_or_name_parser_args = slot_or_name_parser.add_mutually_exclusive_group()
284    slot_or_name_parser_args.add_argument('--slot', help='slot number of the ota partition', type=int)
285    slot_or_name_parser_args.add_argument('--name', help='name of the ota partition')
286
287    subparsers.add_parser('switch_ota_partition', help='switch otadata partition', parents=[slot_or_name_parser, spi_flash_sec_size])
288
289    read_ota_partition_subparser = subparsers.add_parser('read_ota_partition', help='read contents of an ota partition', parents=[slot_or_name_parser])
290    read_ota_partition_subparser.add_argument('--output', help='file to write the contents of the ota partition to', required=True)
291
292    write_ota_partition_subparser = subparsers.add_parser('write_ota_partition', help='write contents to an ota partition', parents=[slot_or_name_parser])
293    write_ota_partition_subparser.add_argument('--input', help='file whose contents to write to the ota partition')
294
295    subparsers.add_parser('erase_ota_partition', help='erase contents of an ota partition', parents=[slot_or_name_parser])
296
297    args = parser.parse_args()
298
299    quiet = args.quiet
300
301    # No operation specified, display help and exit
302    if args.operation is None:
303        if not quiet:
304            parser.print_help()
305        sys.exit(1)
306
307    target_args = {}
308
309    if args.port:
310        target_args['port'] = args.port
311
312    if args.partition_table_file:
313        target_args['partition_table_file'] = args.partition_table_file
314
315    if args.partition_table_offset:
316        target_args['partition_table_offset'] = int(args.partition_table_offset, 0)
317
318    try:
319        if args.spi_flash_sec_size:
320            target_args['spi_flash_sec_size'] = int(args.spi_flash_sec_size, 0)
321    except AttributeError:
322        pass
323
324    if args.esptool_args:
325        target_args['esptool_args'] = args.esptool_args
326
327    if args.esptool_write_args:
328        target_args['esptool_write_args'] = args.esptool_write_args
329
330    if args.esptool_read_args:
331        target_args['esptool_read_args'] = args.esptool_read_args
332
333    if args.esptool_erase_args:
334        target_args['esptool_erase_args'] = args.esptool_erase_args
335
336    if args.baud:
337        target_args['baud'] = args.baud
338
339    target = OtatoolTarget(**target_args)
340
341    # Create the operation table and execute the operation
342    common_args = {'target':target}
343
344    ota_id = []
345
346    try:
347        if args.name is not None:
348            ota_id = ['name']
349        else:
350            if args.slot is not None:
351                ota_id = ['slot']
352    except AttributeError:
353        pass
354
355    otatool_ops = {
356        'read_otadata':(_read_otadata, []),
357        'erase_otadata':(_erase_otadata, []),
358        'switch_ota_partition':(_switch_ota_partition, ota_id),
359        'read_ota_partition':(_read_ota_partition, ['output'] + ota_id),
360        'write_ota_partition':(_write_ota_partition, ['input'] + ota_id),
361        'erase_ota_partition':(_erase_ota_partition, ota_id)
362    }
363
364    (op, op_args) = otatool_ops[args.operation]
365
366    for op_arg in op_args:
367        common_args.update({op_arg:vars(args)[op_arg]})
368
369    try:
370        common_args['ota_id'] = common_args.pop('name')
371    except KeyError:
372        try:
373            common_args['ota_id'] = common_args.pop('slot')
374        except KeyError:
375            pass
376
377    if quiet:
378        # If exceptions occur, suppress and exit quietly
379        try:
380            op(**common_args)
381        except Exception:
382            sys.exit(2)
383    else:
384        op(**common_args)
385
386
387if __name__ == '__main__':
388    main()
389