1#!/usr/bin/env vpython3 2# Copyright 2022 The Chromium Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5"""Implements commands for flashing a Fuchsia device.""" 6 7import argparse 8import logging 9import os 10import subprocess 11import sys 12import time 13 14from typing import Optional, Tuple 15 16import common 17from boot_device import BootMode, StateTransitionError, boot_device 18from common import get_system_info, find_image_in_sdk, \ 19 register_device_args 20from compatible_utils import get_sdk_hash, pave, running_unattended 21from lockfile import lock 22 23# Flash-file lock. Used to restrict number of flash operations per host. 24# File lock should be marked as stale after 15 mins. 25_FF_LOCK = os.path.join('/tmp', 'flash.lock') 26_FF_LOCK_STALE_SECS = 60 * 15 27_FF_LOCK_ACQ_TIMEOUT = _FF_LOCK_STALE_SECS 28 29 30def _get_system_info(target: Optional[str], 31 serial_num: Optional[str]) -> Tuple[str, str]: 32 """Retrieves installed OS version from device. 33 34 Args: 35 target: Target to get system info of. 36 serial_num: Serial number of device to get system info of. 37 Returns: 38 Tuple of strings, containing (product, version number). 39 """ 40 41 # TODO(b/242191374): Remove when devices in swarming are no longer booted 42 # into zedboot. 43 if running_unattended(): 44 try: 45 boot_device(target, BootMode.REGULAR, serial_num) 46 except (subprocess.CalledProcessError, StateTransitionError): 47 logging.warning('Could not boot device. Assuming in ZEDBOOT') 48 return ('', '') 49 wait_cmd = common.run_ffx_command(cmd=('target', 'wait', '-t', '180'), 50 target_id=target, 51 check=False) 52 if wait_cmd.returncode != 0: 53 return ('', '') 54 55 return get_system_info(target) 56 57 58def update_required( 59 os_check, 60 system_image_dir: Optional[str], 61 target: Optional[str], 62 serial_num: Optional[str] = None) -> Tuple[bool, Optional[str]]: 63 """Returns True if a system update is required and path to image dir.""" 64 65 if os_check == 'ignore': 66 return False, system_image_dir 67 if not system_image_dir: 68 raise ValueError('System image directory must be specified.') 69 if not os.path.exists(system_image_dir): 70 logging.warning( 71 'System image directory does not exist. Assuming it\'s ' 72 'a product-bundle name and dynamically searching for ' 73 'image directory') 74 path = find_image_in_sdk(system_image_dir) 75 if not path: 76 raise FileNotFoundError( 77 f'System image directory {system_image_dir} could not' 78 'be found') 79 system_image_dir = path 80 if (os_check == 'check' 81 and get_sdk_hash(system_image_dir) == _get_system_info( 82 target, serial_num)): 83 return False, system_image_dir 84 return True, system_image_dir 85 86 87def _run_flash_command(system_image_dir: str, target_id: Optional[str]): 88 """Helper function for running `ffx target flash`.""" 89 manifest = os.path.join(system_image_dir, 'flash-manifest.manifest') 90 configs = [ 91 'fastboot.usb.disabled=true', 92 'ffx.fastboot.inline_target=true', 93 'fastboot.reboot.reconnect_timeout=120', 94 ] 95 if running_unattended(): 96 # fxb/126212: The timeout rate determines the timeout for each file 97 # transfer based on the size of the file / this rate (in MB). 98 # Decreasing the rate to 1 (from 5) increases the timeout in swarming, 99 # where large files can take longer to transfer. 100 configs.append('fastboot.flash.timeout_rate=1') 101 102 # Flash only with a file lock acquired. 103 # This prevents multiple fastboot binaries from flashing concurrently, 104 # which should increase the odds of flashing success. 105 with lock(_FF_LOCK, timeout=_FF_LOCK_ACQ_TIMEOUT): 106 common.run_ffx_command(cmd=('target', 'flash', manifest, 107 '--no-bootloader-reboot'), 108 target_id=target_id, 109 configs=configs) 110 111 112def flash(system_image_dir: str, 113 target: Optional[str], 114 serial_num: Optional[str] = None) -> None: 115 """Flash the device.""" 116 if serial_num: 117 boot_device(target, BootMode.BOOTLOADER, serial_num) 118 for _ in range(10): 119 time.sleep(10) 120 if common.run_ffx_command(cmd=('target', 'list', serial_num), 121 check=False).returncode == 0: 122 break 123 _run_flash_command(system_image_dir, serial_num) 124 else: 125 _run_flash_command(system_image_dir, target) 126 127 128def update(system_image_dir: str, 129 os_check: str, 130 target: Optional[str], 131 serial_num: Optional[str] = None, 132 should_pave: Optional[bool] = True) -> None: 133 """Conditionally updates target given. 134 135 Args: 136 system_image_dir: string, path to image directory. 137 os_check: <check|ignore|update>, which decides how to update the device. 138 target: Node-name string indicating device that should be updated. 139 serial_num: String of serial number of device that should be updated. 140 should_pave: Optional bool on whether or not to pave or flash. 141 """ 142 needs_update, actual_image_dir = update_required(os_check, 143 system_image_dir, target, 144 serial_num) 145 146 system_image_dir = actual_image_dir 147 if needs_update: 148 if should_pave: 149 if running_unattended(): 150 assert target, ('Target ID must be specified on swarming when' 151 ' paving.') 152 # TODO(crbug.com/1405525): We should check the device state 153 # before and after rebooting it to avoid unnecessary reboot or 154 # undesired state. 155 boot_device(target, BootMode.RECOVERY, serial_num) 156 try: 157 pave(system_image_dir, target) 158 except subprocess.TimeoutExpired: 159 # Fallback to flashing, just in case it might work. 160 # This could recover the device and make it usable. 161 # If it fails, device is unpaveable anyway, and should be taken 162 # out of fleet - this will do that. 163 flash(system_image_dir, target, serial_num) 164 else: 165 flash(system_image_dir, target, serial_num) 166 # Always sleep after all updates. 167 time.sleep(180) 168 169 170def register_update_args(arg_parser: argparse.ArgumentParser, 171 default_os_check: Optional[str] = 'check', 172 default_pave: Optional[bool] = True) -> None: 173 """Register common arguments for device updating.""" 174 serve_args = arg_parser.add_argument_group('update', 175 'device updating arguments') 176 serve_args.add_argument('--system-image-dir', 177 help='Specify the directory that contains the ' 178 'Fuchsia image used to pave the device. Only ' 179 'needs to be specified if "os_check" is not ' 180 '"ignore".') 181 serve_args.add_argument('--serial-num', 182 default=os.environ.get('FUCHSIA_FASTBOOT_SERNUM'), 183 help='Serial number of the device. Should be ' 184 'specified for devices that do not have an image ' 185 'flashed.') 186 serve_args.add_argument('--os-check', 187 choices=['check', 'update', 'ignore'], 188 default=default_os_check, 189 help='Sets the OS version enforcement policy. If ' 190 '"check", then the deployment process will halt ' 191 'if the target\'s version does not match. If ' 192 '"update", then the target device will ' 193 'be reflashed. If "ignore", then the OS version ' 194 'will not be checked.') 195 serve_args.add_argument('--pave', 196 action='store_true', 197 help='Performs a pave instead of a flash. ' 198 'Device must already be in Zedboot') 199 serve_args.add_argument('--no-pave', 200 action='store_false', 201 dest='pave', 202 help='Performs a flash instead of a pave ' 203 '(experimental).') 204 serve_args.set_defaults(pave=default_pave) 205 206 207def main(): 208 """Stand-alone function for flashing a device.""" 209 parser = argparse.ArgumentParser() 210 register_device_args(parser) 211 register_update_args(parser, default_os_check='update', default_pave=False) 212 args = parser.parse_args() 213 update(args.system_image_dir, args.os_check, args.target_id, 214 args.serial_num, args.pave) 215 216 217if __name__ == '__main__': 218 sys.exit(main()) 219