1# Copyright 2023 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""Functionalities to reliably reboot the device.""" 5 6import enum 7import json 8import logging 9import subprocess 10import time 11 12from typing import Optional 13 14from common import run_continuous_ffx_command, run_ffx_command, get_ssh_address 15from compatible_utils import get_ssh_prefix 16 17 18class TargetState(enum.Enum): 19 """State of a target.""" 20 UNKNOWN = enum.auto() 21 DISCONNECTED = enum.auto() 22 PRODUCT = enum.auto() 23 FASTBOOT = enum.auto() 24 ZEDBOOT = enum.auto() 25 26 27class BootMode(enum.Enum): 28 """Specifies boot mode for device.""" 29 REGULAR = enum.auto() 30 RECOVERY = enum.auto() 31 BOOTLOADER = enum.auto() 32 33 34_STATE_TO_BOOTMODE = { 35 TargetState.PRODUCT: BootMode.REGULAR, 36 TargetState.FASTBOOT: BootMode.BOOTLOADER, 37 TargetState.ZEDBOOT: BootMode.RECOVERY 38} 39 40_BOOTMODE_TO_STATE = {value: key for key, value in _STATE_TO_BOOTMODE.items()} 41 42 43class StateNotFoundError(Exception): 44 """Raised when target's state cannot be found.""" 45 46 47class StateTransitionError(Exception): 48 """Raised when target does not transition to desired state.""" 49 50 51def _state_string_to_state(state_str: str) -> TargetState: 52 state_str = state_str.strip().lower() 53 if state_str == 'product': 54 return TargetState.PRODUCT 55 if state_str == 'zedboot (r)': 56 return TargetState.ZEDBOOT 57 if state_str == 'fastboot': 58 return TargetState.FASTBOOT 59 if state_str == 'unknown': 60 return TargetState.UNKNOWN 61 if state_str == 'disconnected': 62 return TargetState.DISCONNECTED 63 64 raise NotImplementedError(f'State {state_str} not supported') 65 66 67def _get_target_state(target_id: Optional[str], 68 serial_num: Optional[str], 69 num_attempts: int = 1) -> TargetState: 70 """Return state of target or the default target. 71 72 Args: 73 target_id: Optional nodename of the target. If not given, default target 74 is used. 75 serial_num: Optional serial number of target. Only usable if device is 76 in fastboot. 77 num_attempts: Optional number of times to attempt getting status. 78 79 Returns: 80 TargetState of the given node, if found. 81 82 Raises: 83 StateNotFoundError: If target cannot be found, or default target is not 84 defined if |target_id| is not given. 85 """ 86 for i in range(num_attempts): 87 targets = json.loads( 88 run_ffx_command(cmd=('target', 'list'), 89 check=True, 90 capture_output=True, 91 json_out=True).stdout.strip()) 92 for target in targets: 93 if target_id is None and target['is_default']: 94 return _state_string_to_state(target['target_state']) 95 if target_id == target['nodename']: 96 return _state_string_to_state(target['target_state']) 97 if serial_num == target['serial']: 98 # Should only return Fastboot. 99 return _state_string_to_state(target['target_state']) 100 # Do not sleep for last attempt. 101 if i < num_attempts - 1: 102 time.sleep(10) 103 104 # Could not find a state for given target. 105 error_target = target_id 106 if target_id is None: 107 error_target = 'default target' 108 109 raise StateNotFoundError(f'Could not find state for {error_target}.') 110 111 112def boot_device(target_id: Optional[str], 113 mode: BootMode, 114 serial_num: Optional[str] = None, 115 must_boot: bool = False) -> None: 116 """Boot device into desired mode, with fallback to SSH on failure. 117 118 Args: 119 target_id: Optional target_id of device. 120 mode: Desired boot mode. 121 must_boot: Forces device to boot, regardless of current state. 122 Raises: 123 StateTransitionError: When final state of device is not desired. 124 """ 125 # Avoid cycle dependency. 126 # This file will be replaced with serial_boot_device quite soon, later one 127 # should be much more reliable comparing to ffx target list and ssh. So 128 # changing the file structure is not necessary in the current situation. 129 # pylint: disable=cyclic-import, import-outside-toplevel 130 # pylint: disable=wrong-import-position 131 import serial_boot_device 132 if serial_boot_device.boot_device(target_id, serial_num, mode, must_boot): 133 return 134 135 # Skip boot call if already in the state and not skipping check. 136 state = _get_target_state(target_id, serial_num, num_attempts=3) 137 wanted_state = _BOOTMODE_TO_STATE.get(mode) 138 if not must_boot: 139 logging.debug('Current state %s. Want state %s', str(state), 140 str(wanted_state)) 141 must_boot = state != wanted_state 142 143 if not must_boot: 144 logging.debug('Skipping boot - already in good state') 145 return 146 147 def _wait_for_state_transition(current_state: TargetState): 148 local_state = None 149 # Check that we transition out of current state. 150 for _ in range(30): 151 try: 152 local_state = _get_target_state(target_id, serial_num) 153 if local_state != current_state: 154 # Changed states - can continue 155 break 156 except StateNotFoundError: 157 logging.debug('Device disconnected...') 158 if current_state != TargetState.DISCONNECTED: 159 # Changed states - can continue 160 break 161 finally: 162 time.sleep(2) 163 else: 164 logging.warning( 165 'Device did not change from initial state. Exiting early') 166 return local_state or TargetState.DISCONNECTED 167 168 # Now we want to transition to the new state. 169 for _ in range(90): 170 try: 171 local_state = _get_target_state(target_id, serial_num) 172 if local_state == wanted_state: 173 return local_state 174 except StateNotFoundError: 175 logging.warning('Could not find target state.' 176 ' Sleeping then retrying...') 177 finally: 178 time.sleep(2) 179 return local_state or TargetState.DISCONNECTED 180 181 _boot_device_ffx(target_id, serial_num, state, mode) 182 state = _wait_for_state_transition(state) 183 184 if state == TargetState.DISCONNECTED: 185 raise StateNotFoundError('Target could not be found!') 186 187 if state == wanted_state: 188 return 189 190 logging.warning( 191 'Booting with FFX to %s did not succeed. Attempting with DM', mode) 192 193 # Fallback to SSH, with no retry if we tried with ffx.: 194 _boot_device_dm(target_id, serial_num, state, mode) 195 state = _wait_for_state_transition(state) 196 197 if state != wanted_state: 198 raise StateTransitionError( 199 f'Could not get device to desired state. Wanted {wanted_state},' 200 f' got {state}') 201 logging.debug('Got desired state: %s', state) 202 203 204def _boot_device_ffx(target_id: Optional[str], serial_num: Optional[str], 205 current_state: TargetState, mode: BootMode): 206 cmd = ['target', 'reboot'] 207 if mode == BootMode.REGULAR: 208 logging.info('Triggering regular boot') 209 elif mode == BootMode.RECOVERY: 210 cmd.append('-r') 211 elif mode == BootMode.BOOTLOADER: 212 cmd.append('-b') 213 else: 214 raise NotImplementedError(f'BootMode {mode} not supported') 215 216 logging.debug('FFX reboot with command [%s]', ' '.join(cmd)) 217 # TODO(crbug.com/1432405): We need to wait for the state transition or kill 218 # the process if it fails. 219 if current_state == TargetState.FASTBOOT: 220 run_continuous_ffx_command(cmd=cmd, 221 target_id=serial_num, 222 configs=['product.reboot.use_dm=true']) 223 else: 224 run_continuous_ffx_command(cmd=cmd, 225 target_id=target_id, 226 configs=['product.reboot.use_dm=true']) 227 228 229def _boot_device_dm(target_id: Optional[str], serial_num: Optional[str], 230 current_state: TargetState, mode: BootMode): 231 # Can only use DM if device is in regular boot. 232 if current_state != TargetState.PRODUCT: 233 if mode == BootMode.REGULAR: 234 raise StateTransitionError('Cannot boot to Regular via DM - ' 235 'FFX already failed to do so.') 236 # Boot to regular. 237 # TODO(crbug.com/1432405): After changing to run_continuous_ffx_command, 238 # this behavior becomes invalid, we need to wait for the state 239 # transition. 240 _boot_device_ffx(target_id, serial_num, current_state, 241 BootMode.REGULAR) 242 243 ssh_prefix = get_ssh_prefix(get_ssh_address(target_id)) 244 245 reboot_cmd = None 246 247 if mode == BootMode.REGULAR: 248 reboot_cmd = 'reboot' 249 elif mode == BootMode.RECOVERY: 250 reboot_cmd = 'reboot-recovery' 251 elif mode == BootMode.BOOTLOADER: 252 reboot_cmd = 'reboot-bootloader' 253 else: 254 raise NotImplementedError(f'BootMode {mode} not supported') 255 256 # Boot commands can fail due to SSH connections timeout. 257 full_cmd = ssh_prefix + ['--', 'dm', reboot_cmd] 258 logging.debug('DM reboot with command [%s]', ' '.join(full_cmd)) 259 subprocess.run(full_cmd, check=False) 260