1#!/usr/bin/env vpython3 2# Copyright 2022 The Chromium Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5"""Implements commands for flashing a Fuchsia device.""" 6 7import argparse 8import logging 9import os 10import subprocess 11import sys 12import time 13 14from typing import Optional, Tuple 15 16import common 17from boot_device import BootMode, StateTransitionError, boot_device 18from common import get_system_info, find_image_in_sdk, \ 19 register_device_args 20from compatible_utils import get_sdk_hash, pave, running_unattended 21from lockfile import lock 22 23# Flash-file lock. Used to restrict number of flash operations per host. 24# File lock should be marked as stale after 15 mins. 25_FF_LOCK = os.path.join('/tmp', 'flash.lock') 26_FF_LOCK_STALE_SECS = 60 * 15 27_FF_LOCK_ACQ_TIMEOUT = _FF_LOCK_STALE_SECS 28 29 30def _get_system_info(target: Optional[str], 31 serial_num: Optional[str]) -> Tuple[str, str]: 32 """Retrieves installed OS version from device. 33 34 Args: 35 target: Target to get system info of. 36 serial_num: Serial number of device to get system info of. 37 Returns: 38 Tuple of strings, containing (product, version number). 39 """ 40 41 if running_unattended(): 42 try: 43 boot_device(target, BootMode.REGULAR, serial_num) 44 except (subprocess.CalledProcessError, StateTransitionError): 45 logging.warning('Could not boot device. Assuming in fastboot') 46 return ('', '') 47 48 return get_system_info(target) 49 50 51def update_required( 52 os_check, 53 system_image_dir: Optional[str], 54 target: Optional[str], 55 serial_num: Optional[str] = None) -> Tuple[bool, Optional[str]]: 56 """Returns True if a system update is required and path to image dir.""" 57 58 if os_check == 'ignore': 59 return False, system_image_dir 60 if not system_image_dir: 61 raise ValueError('System image directory must be specified.') 62 if not os.path.exists(system_image_dir): 63 logging.warning( 64 'System image directory does not exist. Assuming it\'s ' 65 'a product-bundle name and dynamically searching for ' 66 'image directory') 67 path = find_image_in_sdk(system_image_dir) 68 if not path: 69 raise FileNotFoundError( 70 f'System image directory {system_image_dir} could not' 71 'be found') 72 system_image_dir = path 73 if (os_check == 'check' 74 and get_sdk_hash(system_image_dir) == _get_system_info( 75 target, serial_num)): 76 return False, system_image_dir 77 return True, system_image_dir 78 79 80def _run_flash_command(system_image_dir: str, target_id: Optional[str]): 81 """Helper function for running `ffx target flash`.""" 82 manifest = os.path.join(system_image_dir, 'flash-manifest.manifest') 83 configs = [ 84 'fastboot.usb.disabled=true', 85 'ffx.fastboot.inline_target=true', 86 'fastboot.reboot.reconnect_timeout=120', 87 ] 88 if running_unattended(): 89 # fxb/126212: The timeout rate determines the timeout for each file 90 # transfer based on the size of the file / this rate (in MB). 91 # Decreasing the rate to 1 (from 5) increases the timeout in swarming, 92 # where large files can take longer to transfer. 93 configs.append('fastboot.flash.timeout_rate=1') 94 95 # Flash only with a file lock acquired. 96 # This prevents multiple fastboot binaries from flashing concurrently, 97 # which should increase the odds of flashing success. 98 with lock(_FF_LOCK, timeout=_FF_LOCK_ACQ_TIMEOUT): 99 common.run_ffx_command(cmd=('target', 'flash', manifest, 100 '--no-bootloader-reboot'), 101 target_id=target_id, 102 configs=configs) 103 104 105def flash(system_image_dir: str, 106 target: Optional[str], 107 serial_num: Optional[str] = None) -> None: 108 """Flash the device.""" 109 if serial_num: 110 boot_device(target, BootMode.BOOTLOADER, serial_num) 111 for _ in range(10): 112 time.sleep(10) 113 if common.run_ffx_command(cmd=('target', 'list', serial_num), 114 check=False).returncode == 0: 115 break 116 _run_flash_command(system_image_dir, serial_num) 117 else: 118 _run_flash_command(system_image_dir, target) 119 120 121def update(system_image_dir: str, 122 os_check: str, 123 target: Optional[str], 124 serial_num: Optional[str] = None, 125 should_pave: Optional[bool] = True) -> None: 126 """Conditionally updates target given. 127 128 Args: 129 system_image_dir: string, path to image directory. 130 os_check: <check|ignore|update>, which decides how to update the device. 131 target: Node-name string indicating device that should be updated. 132 serial_num: String of serial number of device that should be updated. 133 should_pave: Optional bool on whether or not to pave or flash. 134 """ 135 needs_update, actual_image_dir = update_required(os_check, 136 system_image_dir, target, 137 serial_num) 138 139 system_image_dir = actual_image_dir 140 if needs_update: 141 if should_pave: 142 if running_unattended(): 143 assert target, ('Target ID must be specified on swarming when' 144 ' paving.') 145 # TODO(crbug.com/1405525): We should check the device state 146 # before and after rebooting it to avoid unnecessary reboot or 147 # undesired state. 148 boot_device(target, BootMode.RECOVERY, serial_num) 149 try: 150 pave(system_image_dir, target) 151 except subprocess.TimeoutExpired: 152 # Fallback to flashing, just in case it might work. 153 # This could recover the device and make it usable. 154 # If it fails, device is unpaveable anyway, and should be taken 155 # out of fleet - this will do that. 156 flash(system_image_dir, target, serial_num) 157 else: 158 flash(system_image_dir, target, serial_num) 159 # Always sleep after all updates. 160 time.sleep(180) 161 162 163def register_update_args(arg_parser: argparse.ArgumentParser, 164 default_os_check: Optional[str] = 'check', 165 default_pave: Optional[bool] = True) -> None: 166 """Register common arguments for device updating.""" 167 serve_args = arg_parser.add_argument_group('update', 168 'device updating arguments') 169 serve_args.add_argument('--system-image-dir', 170 help='Specify the directory that contains the ' 171 'Fuchsia image used to pave the device. Only ' 172 'needs to be specified if "os_check" is not ' 173 '"ignore".') 174 serve_args.add_argument('--serial-num', 175 default=os.environ.get('FUCHSIA_FASTBOOT_SERNUM'), 176 help='Serial number of the device. Should be ' 177 'specified for devices that do not have an image ' 178 'flashed.') 179 serve_args.add_argument('--os-check', 180 choices=['check', 'update', 'ignore'], 181 default=default_os_check, 182 help='Sets the OS version enforcement policy. If ' 183 '"check", then the deployment process will halt ' 184 'if the target\'s version does not match. If ' 185 '"update", then the target device will ' 186 'be reflashed. If "ignore", then the OS version ' 187 'will not be checked.') 188 serve_args.add_argument('--pave', 189 action='store_true', 190 help='Performs a pave instead of a flash. ' 191 'Device must already be in Zedboot') 192 serve_args.add_argument('--no-pave', 193 action='store_false', 194 dest='pave', 195 help='Performs a flash instead of a pave ' 196 '(experimental).') 197 serve_args.set_defaults(pave=default_pave) 198 199 200def main(): 201 """Stand-alone function for flashing a device.""" 202 parser = argparse.ArgumentParser() 203 register_device_args(parser) 204 register_update_args(parser, default_os_check='update', default_pave=False) 205 args = parser.parse_args() 206 update(args.system_image_dir, args.os_check, args.target_id, 207 args.serial_num, args.pave) 208 209 210if __name__ == '__main__': 211 sys.exit(main()) 212