• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env vpython3
2# Copyright 2022 The Chromium Authors
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5"""Implements commands for flashing a Fuchsia device."""
6
7import argparse
8import logging
9import os
10import subprocess
11import sys
12import time
13
14from typing import Optional, Tuple
15
16import common
17from boot_device import BootMode, StateTransitionError, boot_device
18from common import get_system_info, find_image_in_sdk, \
19                   register_device_args
20from compatible_utils import get_sdk_hash, pave, running_unattended
21from lockfile import lock
22
23# Flash-file lock. Used to restrict number of flash operations per host.
24# File lock should be marked as stale after 15 mins.
25_FF_LOCK = os.path.join('/tmp', 'flash.lock')
26_FF_LOCK_STALE_SECS = 60 * 15
27_FF_LOCK_ACQ_TIMEOUT = _FF_LOCK_STALE_SECS
28
29
30def _get_system_info(target: Optional[str],
31                     serial_num: Optional[str]) -> Tuple[str, str]:
32    """Retrieves installed OS version from device.
33
34    Args:
35        target: Target to get system info of.
36        serial_num: Serial number of device to get system info of.
37    Returns:
38        Tuple of strings, containing (product, version number).
39    """
40
41    # TODO(b/242191374): Remove when devices in swarming are no longer booted
42    # into zedboot.
43    if running_unattended():
44        try:
45            boot_device(target, BootMode.REGULAR, serial_num)
46        except (subprocess.CalledProcessError, StateTransitionError):
47            logging.warning('Could not boot device. Assuming in ZEDBOOT')
48            return ('', '')
49        wait_cmd = common.run_ffx_command(cmd=('target', 'wait', '-t', '180'),
50                                          target_id=target,
51                                          check=False)
52        if wait_cmd.returncode != 0:
53            return ('', '')
54
55    return get_system_info(target)
56
57
58def update_required(
59        os_check,
60        system_image_dir: Optional[str],
61        target: Optional[str],
62        serial_num: Optional[str] = None) -> Tuple[bool, Optional[str]]:
63    """Returns True if a system update is required and path to image dir."""
64
65    if os_check == 'ignore':
66        return False, system_image_dir
67    if not system_image_dir:
68        raise ValueError('System image directory must be specified.')
69    if not os.path.exists(system_image_dir):
70        logging.warning(
71            'System image directory does not exist. Assuming it\'s '
72            'a product-bundle name and dynamically searching for '
73            'image directory')
74        path = find_image_in_sdk(system_image_dir)
75        if not path:
76            raise FileNotFoundError(
77                f'System image directory {system_image_dir} could not'
78                'be found')
79        system_image_dir = path
80    if (os_check == 'check'
81            and get_sdk_hash(system_image_dir) == _get_system_info(
82                target, serial_num)):
83        return False, system_image_dir
84    return True, system_image_dir
85
86
87def _run_flash_command(system_image_dir: str, target_id: Optional[str]):
88    """Helper function for running `ffx target flash`."""
89    manifest = os.path.join(system_image_dir, 'flash-manifest.manifest')
90    configs = [
91        'fastboot.usb.disabled=true',
92        'ffx.fastboot.inline_target=true',
93        'fastboot.reboot.reconnect_timeout=120',
94    ]
95    if running_unattended():
96        # fxb/126212: The timeout rate determines the timeout for each file
97        # transfer based on the size of the file / this rate (in MB).
98        # Decreasing the rate to 1 (from 5) increases the timeout in swarming,
99        # where large files can take longer to transfer.
100        configs.append('fastboot.flash.timeout_rate=1')
101
102    # Flash only with a file lock acquired.
103    # This prevents multiple fastboot binaries from flashing concurrently,
104    # which should increase the odds of flashing success.
105    with lock(_FF_LOCK, timeout=_FF_LOCK_ACQ_TIMEOUT):
106        common.run_ffx_command(cmd=('target', 'flash', manifest,
107                                    '--no-bootloader-reboot'),
108                               target_id=target_id,
109                               configs=configs)
110
111
112def flash(system_image_dir: str,
113          target: Optional[str],
114          serial_num: Optional[str] = None) -> None:
115    """Flash the device."""
116    if serial_num:
117        boot_device(target, BootMode.BOOTLOADER, serial_num)
118        for _ in range(10):
119            time.sleep(10)
120            if common.run_ffx_command(cmd=('target', 'list', serial_num),
121                                      check=False).returncode == 0:
122                break
123        _run_flash_command(system_image_dir, serial_num)
124    else:
125        _run_flash_command(system_image_dir, target)
126
127
128def update(system_image_dir: str,
129           os_check: str,
130           target: Optional[str],
131           serial_num: Optional[str] = None,
132           should_pave: Optional[bool] = True) -> None:
133    """Conditionally updates target given.
134
135    Args:
136        system_image_dir: string, path to image directory.
137        os_check: <check|ignore|update>, which decides how to update the device.
138        target: Node-name string indicating device that should be updated.
139        serial_num: String of serial number of device that should be updated.
140        should_pave: Optional bool on whether or not to pave or flash.
141    """
142    needs_update, actual_image_dir = update_required(os_check,
143                                                     system_image_dir, target,
144                                                     serial_num)
145
146    system_image_dir = actual_image_dir
147    if needs_update:
148        if should_pave:
149            if running_unattended():
150                assert target, ('Target ID must be specified on swarming when'
151                                ' paving.')
152                # TODO(crbug.com/1405525): We should check the device state
153                # before and after rebooting it to avoid unnecessary reboot or
154                # undesired state.
155                boot_device(target, BootMode.RECOVERY, serial_num)
156            try:
157                pave(system_image_dir, target)
158            except subprocess.TimeoutExpired:
159                # Fallback to flashing, just in case it might work.
160                # This could recover the device and make it usable.
161                # If it fails, device is unpaveable anyway, and should be taken
162                # out of fleet - this will do that.
163                flash(system_image_dir, target, serial_num)
164        else:
165            flash(system_image_dir, target, serial_num)
166        # Always sleep after all updates.
167        time.sleep(180)
168
169
170def register_update_args(arg_parser: argparse.ArgumentParser,
171                         default_os_check: Optional[str] = 'check',
172                         default_pave: Optional[bool] = True) -> None:
173    """Register common arguments for device updating."""
174    serve_args = arg_parser.add_argument_group('update',
175                                               'device updating arguments')
176    serve_args.add_argument('--system-image-dir',
177                            help='Specify the directory that contains the '
178                            'Fuchsia image used to pave the device. Only '
179                            'needs to be specified if "os_check" is not '
180                            '"ignore".')
181    serve_args.add_argument('--serial-num',
182                            default=os.environ.get('FUCHSIA_FASTBOOT_SERNUM'),
183                            help='Serial number of the device. Should be '
184                            'specified for devices that do not have an image '
185                            'flashed.')
186    serve_args.add_argument('--os-check',
187                            choices=['check', 'update', 'ignore'],
188                            default=default_os_check,
189                            help='Sets the OS version enforcement policy. If '
190                            '"check", then the deployment process will halt '
191                            'if the target\'s version does not match. If '
192                            '"update", then the target device will '
193                            'be reflashed. If "ignore", then the OS version '
194                            'will not be checked.')
195    serve_args.add_argument('--pave',
196                            action='store_true',
197                            help='Performs a pave instead of a flash. '
198                            'Device must already be in Zedboot')
199    serve_args.add_argument('--no-pave',
200                            action='store_false',
201                            dest='pave',
202                            help='Performs a flash instead of a pave '
203                            '(experimental).')
204    serve_args.set_defaults(pave=default_pave)
205
206
207def main():
208    """Stand-alone function for flashing a device."""
209    parser = argparse.ArgumentParser()
210    register_device_args(parser)
211    register_update_args(parser, default_os_check='update', default_pave=False)
212    args = parser.parse_args()
213    update(args.system_image_dir, args.os_check, args.target_id,
214           args.serial_num, args.pave)
215
216
217if __name__ == '__main__':
218    sys.exit(main())
219