1# Copyright 2022 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4"""Functions used in both v1 and v2 scripts.""" 5 6import os 7import platform 8import re 9import stat 10import subprocess 11 12from typing import Iterable, List, Optional, Tuple 13 14 15# File indicating version of an image downloaded to the host 16_BUILD_ARGS = "buildargs.gn" 17_ARGS_FILE = 'args.gn' 18 19_FILTER_DIR = 'testing/buildbot/filters' 20_SSH_KEYS = os.path.expanduser('~/.ssh/fuchsia_authorized_keys') 21 22 23class VersionNotFoundError(Exception): 24 """Thrown when version info cannot be retrieved from device.""" 25 26 27def get_ssh_keys() -> str: 28 """Returns path of Fuchsia ssh keys.""" 29 30 return _SSH_KEYS 31 32 33def running_unattended() -> bool: 34 """Returns true if running non-interactively. 35 36 When running unattended, confirmation prompts and the like are suppressed. 37 """ 38 39 # TODO(crbug/1401387): Change to mixin based approach. 40 return 'SWARMING_SERVER' in os.environ 41 42 43def get_host_arch() -> str: 44 """Retrieve CPU architecture of the host machine. """ 45 host_arch = platform.machine() 46 # platform.machine() returns AMD64 on 64-bit Windows. 47 if host_arch in ['x86_64', 'AMD64']: 48 return 'x64' 49 if host_arch in ['aarch64', 'arm64']: 50 return 'arm64' 51 raise NotImplementedError('Unsupported host architecture: %s' % host_arch) 52 53 54def add_exec_to_file(file: str) -> None: 55 """Add execution bits to a file. 56 57 Args: 58 file: path to the file. 59 """ 60 file_stat = os.stat(file) 61 os.chmod(file, file_stat.st_mode | stat.S_IXUSR) 62 63 64def _add_exec_to_pave_binaries(system_image_dir: str): 65 """Add exec to required pave files. 66 67 The pave files may vary depending if a product-bundle or a prebuilt images 68 directory is being used. 69 Args: 70 system_image_dir: string path to the directory containing the pave files. 71 """ 72 pb_files = [ 73 'pave.sh', 74 os.path.join(f'host_{get_host_arch()}', 'bootserver') 75 ] 76 image_files = [ 77 'pave.sh', 78 os.path.join(f'bootserver.exe.linux-{get_host_arch()}') 79 ] 80 use_pb_files = os.path.exists(os.path.join(system_image_dir, pb_files[1])) 81 for f in pb_files if use_pb_files else image_files: 82 add_exec_to_file(os.path.join(system_image_dir, f)) 83 84 85def pave(image_dir: str, target_id: Optional[str])\ 86 -> subprocess.CompletedProcess: 87 """"Pave a device using the pave script inside |image_dir|.""" 88 _add_exec_to_pave_binaries(image_dir) 89 pave_command = [ 90 os.path.join(image_dir, 'pave.sh'), '--authorized-keys', 91 get_ssh_keys(), '-1' 92 ] 93 if target_id: 94 pave_command.extend(['-n', target_id]) 95 return subprocess.run(pave_command, check=True, text=True, timeout=300) 96 97 98def parse_host_port(host_port_pair: str) -> Tuple[str, int]: 99 """Parses a host name or IP address and a port number from a string of 100 any of the following forms: 101 - hostname:port 102 - IPv4addy:port 103 - [IPv6addy]:port 104 105 Returns: 106 A tuple of the string host name/address and integer port number. 107 108 Raises: 109 ValueError if `host_port_pair` does not contain a colon or if the 110 substring following the last colon cannot be converted to an int. 111 """ 112 113 host, port = host_port_pair.rsplit(':', 1) 114 115 # Strip the brackets if the host looks like an IPv6 address. 116 if len(host) >= 4 and host[0] == '[' and host[-1] == ']': 117 host = host[1:-1] 118 return (host, int(port)) 119 120 121def get_ssh_prefix(host_port_pair: str) -> List[str]: 122 """Get the prefix of a barebone ssh command.""" 123 124 ssh_addr, ssh_port = parse_host_port(host_port_pair) 125 sshconfig = os.path.join(os.path.dirname(__file__), 'sshconfig') 126 return ['ssh', '-F', sshconfig, ssh_addr, '-p', str(ssh_port)] 127 128 129def install_symbols(package_paths: Iterable[str], 130 fuchsia_out_dir: str) -> None: 131 """Installs debug symbols for a package into the GDB-standard symbol 132 directory located in fuchsia_out_dir.""" 133 134 symbol_root = os.path.join(fuchsia_out_dir, '.build-id') 135 for path in package_paths: 136 package_dir = os.path.dirname(path) 137 ids_txt_path = os.path.join(package_dir, 'ids.txt') 138 with open(ids_txt_path, 'r') as f: 139 for entry in f: 140 build_id, binary_relpath = entry.strip().split(' ') 141 binary_abspath = os.path.abspath( 142 os.path.join(package_dir, binary_relpath)) 143 symbol_dir = os.path.join(symbol_root, build_id[:2]) 144 symbol_file = os.path.join(symbol_dir, build_id[2:] + '.debug') 145 if not os.path.exists(symbol_dir): 146 os.makedirs(symbol_dir) 147 148 if os.path.islink(symbol_file) or os.path.exists(symbol_file): 149 # Clobber the existing entry to ensure that the symlink's 150 # target is up to date. 151 os.unlink(symbol_file) 152 os.symlink(os.path.relpath(binary_abspath, symbol_dir), 153 symbol_file) 154 155 156# TODO(crbug.com/1279803): Until one can send files to the device when running 157# a test, filter files must be read from the test package. 158def map_filter_file_to_package_file(filter_file: str) -> str: 159 """Returns the path to |filter_file| within the test component's package.""" 160 161 if not _FILTER_DIR in filter_file: 162 raise ValueError('CFv2 tests only support registered filter files ' 163 'present in the test package') 164 return '/pkg/' + filter_file[filter_file.index(_FILTER_DIR):] 165 166 167def get_sdk_hash(system_image_dir: str) -> Tuple[str, str]: 168 """Read version of hash in pre-installed package directory. 169 Returns: 170 Tuple of (product, version) of image to be installed. 171 Raises: 172 VersionNotFoundError: if contents of buildargs.gn cannot be found or the 173 version number cannot be extracted. 174 """ 175 176 # TODO(crbug.com/1261961): Stop processing buildargs.gn directly. 177 args_file = os.path.join(system_image_dir, _BUILD_ARGS) 178 if not os.path.exists(args_file): 179 args_file = os.path.join(system_image_dir, _ARGS_FILE) 180 181 if not os.path.exists(args_file): 182 raise VersionNotFoundError( 183 f'Dir {system_image_dir} did not contain {_BUILD_ARGS} or ' 184 f'{_ARGS_FILE}') 185 186 with open(args_file) as f: 187 contents = f.readlines() 188 if not contents: 189 raise VersionNotFoundError('Could not retrieve %s' % args_file) 190 version_key = 'build_info_version' 191 product_key = 'build_info_product' 192 info_keys = [product_key, version_key] 193 version_info = {} 194 for line in contents: 195 for key in info_keys: 196 match = re.match(r'%s = "(.*)"' % key, line) 197 if match: 198 version_info[key] = match.group(1) 199 if not (version_key in version_info and product_key in version_info): 200 raise VersionNotFoundError( 201 'Could not extract version info from %s. Contents: %s' % 202 (args_file, contents)) 203 204 return (version_info[product_key], version_info[version_key]) 205