• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4"""Functions used in both v1 and v2 scripts."""
5
6import os
7import platform
8import re
9import stat
10import subprocess
11
12from typing import Iterable, List, Optional, Tuple
13
14
15# File indicating version of an image downloaded to the host
16_BUILD_ARGS = "buildargs.gn"
17_ARGS_FILE = 'args.gn'
18
19_FILTER_DIR = 'testing/buildbot/filters'
20_SSH_KEYS = os.path.expanduser('~/.ssh/fuchsia_authorized_keys')
21
22
23class VersionNotFoundError(Exception):
24    """Thrown when version info cannot be retrieved from device."""
25
26
27def get_ssh_keys() -> str:
28    """Returns path of Fuchsia ssh keys."""
29
30    return _SSH_KEYS
31
32
33def running_unattended() -> bool:
34    """Returns true if running non-interactively.
35
36    When running unattended, confirmation prompts and the like are suppressed.
37    """
38
39    # TODO(crbug/1401387): Change to mixin based approach.
40    return 'SWARMING_SERVER' in os.environ
41
42
43def get_host_arch() -> str:
44    """Retrieve CPU architecture of the host machine. """
45    host_arch = platform.machine()
46    # platform.machine() returns AMD64 on 64-bit Windows.
47    if host_arch in ['x86_64', 'AMD64']:
48        return 'x64'
49    if host_arch in ['aarch64', 'arm64']:
50        return 'arm64'
51    raise NotImplementedError('Unsupported host architecture: %s' % host_arch)
52
53
54def add_exec_to_file(file: str) -> None:
55    """Add execution bits to a file.
56
57    Args:
58        file: path to the file.
59    """
60    file_stat = os.stat(file)
61    os.chmod(file, file_stat.st_mode | stat.S_IXUSR)
62
63
64def _add_exec_to_pave_binaries(system_image_dir: str):
65    """Add exec to required pave files.
66
67    The pave files may vary depending if a product-bundle or a prebuilt images
68    directory is being used.
69    Args:
70      system_image_dir: string path to the directory containing the pave files.
71    """
72    pb_files = [
73        'pave.sh',
74        os.path.join(f'host_{get_host_arch()}', 'bootserver')
75    ]
76    image_files = [
77        'pave.sh',
78        os.path.join(f'bootserver.exe.linux-{get_host_arch()}')
79    ]
80    use_pb_files = os.path.exists(os.path.join(system_image_dir, pb_files[1]))
81    for f in pb_files if use_pb_files else image_files:
82        add_exec_to_file(os.path.join(system_image_dir, f))
83
84
85def pave(image_dir: str, target_id: Optional[str])\
86        -> subprocess.CompletedProcess:
87    """"Pave a device using the pave script inside |image_dir|."""
88    _add_exec_to_pave_binaries(image_dir)
89    pave_command = [
90        os.path.join(image_dir, 'pave.sh'), '--authorized-keys',
91        get_ssh_keys(), '-1'
92    ]
93    if target_id:
94        pave_command.extend(['-n', target_id])
95    return subprocess.run(pave_command, check=True, text=True, timeout=300)
96
97
98def parse_host_port(host_port_pair: str) -> Tuple[str, int]:
99    """Parses a host name or IP address and a port number from a string of
100    any of the following forms:
101    - hostname:port
102    - IPv4addy:port
103    - [IPv6addy]:port
104
105    Returns:
106        A tuple of the string host name/address and integer port number.
107
108    Raises:
109        ValueError if `host_port_pair` does not contain a colon or if the
110        substring following the last colon cannot be converted to an int.
111    """
112
113    host, port = host_port_pair.rsplit(':', 1)
114
115    # Strip the brackets if the host looks like an IPv6 address.
116    if len(host) >= 4 and host[0] == '[' and host[-1] == ']':
117        host = host[1:-1]
118    return (host, int(port))
119
120
121def get_ssh_prefix(host_port_pair: str) -> List[str]:
122    """Get the prefix of a barebone ssh command."""
123
124    ssh_addr, ssh_port = parse_host_port(host_port_pair)
125    sshconfig = os.path.join(os.path.dirname(__file__), 'sshconfig')
126    return ['ssh', '-F', sshconfig, ssh_addr, '-p', str(ssh_port)]
127
128
129def install_symbols(package_paths: Iterable[str],
130                    fuchsia_out_dir: str) -> None:
131    """Installs debug symbols for a package into the GDB-standard symbol
132    directory located in fuchsia_out_dir."""
133
134    symbol_root = os.path.join(fuchsia_out_dir, '.build-id')
135    for path in package_paths:
136        package_dir = os.path.dirname(path)
137        ids_txt_path = os.path.join(package_dir, 'ids.txt')
138        with open(ids_txt_path, 'r') as f:
139            for entry in f:
140                build_id, binary_relpath = entry.strip().split(' ')
141                binary_abspath = os.path.abspath(
142                    os.path.join(package_dir, binary_relpath))
143                symbol_dir = os.path.join(symbol_root, build_id[:2])
144                symbol_file = os.path.join(symbol_dir, build_id[2:] + '.debug')
145                if not os.path.exists(symbol_dir):
146                    os.makedirs(symbol_dir)
147
148                if os.path.islink(symbol_file) or os.path.exists(symbol_file):
149                    # Clobber the existing entry to ensure that the symlink's
150                    # target is up to date.
151                    os.unlink(symbol_file)
152                os.symlink(os.path.relpath(binary_abspath, symbol_dir),
153                           symbol_file)
154
155
156# TODO(crbug.com/1279803): Until one can send files to the device when running
157# a test, filter files must be read from the test package.
158def map_filter_file_to_package_file(filter_file: str) -> str:
159    """Returns the path to |filter_file| within the test component's package."""
160
161    if not _FILTER_DIR in filter_file:
162        raise ValueError('CFv2 tests only support registered filter files '
163                         'present in the test package')
164    return '/pkg/' + filter_file[filter_file.index(_FILTER_DIR):]
165
166
167def get_sdk_hash(system_image_dir: str) -> Tuple[str, str]:
168    """Read version of hash in pre-installed package directory.
169    Returns:
170        Tuple of (product, version) of image to be installed.
171    Raises:
172        VersionNotFoundError: if contents of buildargs.gn cannot be found or the
173        version number cannot be extracted.
174    """
175
176    # TODO(crbug.com/1261961): Stop processing buildargs.gn directly.
177    args_file = os.path.join(system_image_dir, _BUILD_ARGS)
178    if not os.path.exists(args_file):
179        args_file = os.path.join(system_image_dir, _ARGS_FILE)
180
181    if not os.path.exists(args_file):
182        raise VersionNotFoundError(
183            f'Dir {system_image_dir} did not contain {_BUILD_ARGS} or '
184            f'{_ARGS_FILE}')
185
186    with open(args_file) as f:
187        contents = f.readlines()
188    if not contents:
189        raise VersionNotFoundError('Could not retrieve %s' % args_file)
190    version_key = 'build_info_version'
191    product_key = 'build_info_product'
192    info_keys = [product_key, version_key]
193    version_info = {}
194    for line in contents:
195        for key in info_keys:
196            match = re.match(r'%s = "(.*)"' % key, line)
197            if match:
198                version_info[key] = match.group(1)
199    if not (version_key in version_info and product_key in version_info):
200        raise VersionNotFoundError(
201            'Could not extract version info from %s. Contents: %s' %
202            (args_file, contents))
203
204    return (version_info[product_key], version_info[version_key])
205