1#!/usr/bin/env python3 2# Copyright 2023 The ChromiumOS Authors 3# Use of this source code is governed by a BSD-style license that can be 4# found in the LICENSE file. 5 6""" 7Provides general utility functions. 8""" 9 10import argparse 11import contextlib 12import datetime 13import functools 14import os 15import re 16import subprocess 17import sys 18import urllib 19import urllib.request 20import urllib.error 21from pathlib import Path 22from subprocess import DEVNULL, PIPE, STDOUT # type: ignore 23from typing import ( 24 Dict, 25 List, 26 NamedTuple, 27 Optional, 28 Tuple, 29 Union, 30) 31 32PathLike = Union[Path, str] 33 34# Regex that matches ANSI escape sequences 35ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])") 36 37 38def find_crosvm_root(): 39 "Walk up from CWD until we find the crosvm root dir." 40 path = Path("").resolve() 41 while True: 42 if (path / "tools/impl/common.py").is_file(): 43 return path 44 if path.parent: 45 path = path.parent 46 else: 47 raise Exception("Cannot find crosvm root dir.") 48 49 50"Root directory of crosvm derived from CWD." 51CROSVM_ROOT = find_crosvm_root() 52 53"Cargo.toml file of crosvm" 54CROSVM_TOML = CROSVM_ROOT / "Cargo.toml" 55 56""" 57Root directory of crosvm devtools. 58 59May be different from `CROSVM_ROOT/tools`, which is allows you to run the crosvm dev 60tools from this directory on another crosvm repo. 61 62Use this if you want to call crosvm dev tools, which will use the scripts relative 63to this file. 64""" 65TOOLS_ROOT = Path(__file__).parent.parent.resolve() 66 67"Cache directory that is preserved between builds in CI." 68CACHE_DIR = Path(os.environ.get("CROSVM_CACHE_DIR", os.environ.get("TMPDIR", "/tmp"))) 69 70# Ensure that we really found the crosvm root directory 71assert 'name = "crosvm"' in CROSVM_TOML.read_text() 72 73# List of times recorded by `record_time` which will be printed if --timing-info is provided. 74global_time_records: List[Tuple[str, datetime.timedelta]] = [] 75 76 77def crosvm_target_dir(): 78 crosvm_target = os.environ.get("CROSVM_TARGET_DIR") 79 cargo_target = os.environ.get("CARGO_TARGET_DIR") 80 if crosvm_target: 81 return Path(crosvm_target) 82 elif cargo_target: 83 return Path(cargo_target) / "crosvm" 84 else: 85 return CROSVM_ROOT / "target/crosvm" 86 87 88@functools.lru_cache(None) 89def parse_common_args(): 90 """ 91 Parse args common to all scripts 92 93 These args are parsed separately of the run_main/run_commands method so we can access 94 verbose/etc before the commands arguments are parsed. 95 """ 96 parser = argparse.ArgumentParser(add_help=False) 97 add_common_args(parser) 98 return parser.parse_known_args()[0] 99 100 101def add_common_args(parser: argparse.ArgumentParser): 102 "These args are added to all commands." 103 parser.add_argument( 104 "--color", 105 default="auto", 106 choices=("always", "never", "auto"), 107 help="Force enable or disable colors. Defaults to automatic detection.", 108 ) 109 parser.add_argument( 110 "--verbose", 111 "-v", 112 action="store_true", 113 default=False, 114 help="Print more details about the commands this script is running.", 115 ) 116 parser.add_argument( 117 "--very-verbose", 118 "-vv", 119 action="store_true", 120 default=False, 121 help="Print more debug output", 122 ) 123 parser.add_argument( 124 "--timing-info", 125 action="store_true", 126 default=False, 127 help="Print info on how long which parts of the command take", 128 ) 129 130 131def verbose(): 132 return very_verbose() or parse_common_args().verbose 133 134 135def very_verbose(): 136 return parse_common_args().very_verbose 137 138 139def color_enabled(): 140 color_arg = parse_common_args().color 141 if color_arg == "never": 142 return False 143 if color_arg == "always": 144 return True 145 return sys.stdout.isatty() 146 147 148def find_scripts(path: Path, shebang: str): 149 for file in path.glob("*"): 150 if file.is_file() and file.open(errors="ignore").read(512).startswith(f"#!{shebang}"): 151 yield file 152 153 154def confirm(message: str, default: bool = False): 155 print(message, "[y/N]" if default == False else "[Y/n]", end=" ", flush=True) 156 response = sys.stdin.readline().strip() 157 if response in ("y", "Y"): 158 return True 159 if response in ("n", "N"): 160 return False 161 return default 162 163 164def is_cros_repo(): 165 "Returns true if the crosvm repo is a symlink or worktree to a CrOS repo checkout." 166 dot_git = CROSVM_ROOT / ".git" 167 if not dot_git.is_symlink() and dot_git.is_dir(): 168 return False 169 return (cros_repo_root() / ".repo").exists() 170 171 172def cros_repo_root(): 173 "Root directory of the CrOS repo checkout." 174 return (CROSVM_ROOT / "../../..").resolve() 175 176 177def is_kiwi_repo(): 178 "Returns true if the crosvm repo contains .kiwi_repo file." 179 dot_kiwi_repo = CROSVM_ROOT / ".kiwi_repo" 180 return dot_kiwi_repo.exists() 181 182 183def kiwi_repo_root(): 184 "Root directory of the kiwi repo checkout." 185 return (CROSVM_ROOT / "../..").resolve() 186 187def is_aosp_repo(): 188 "Returns true if the crosvm repo is an AOSP repo checkout." 189 android_bp = CROSVM_ROOT / "Android.bp" 190 return android_bp.exists() 191 192def aosp_repo_root(): 193 "Root directory of AOSP repo checkout." 194 return (CROSVM_ROOT / "../..").resolve() 195 196def is_aosp_repo(): 197 "Returns true if the crosvm repo is an AOSP repo checkout." 198 android_bp = CROSVM_ROOT / "Android.bp" 199 return android_bp.exists() 200 201 202def aosp_repo_root(): 203 "Root directory of AOSP repo checkout." 204 return (CROSVM_ROOT / "../..").resolve() 205 206 207def sudo_is_passwordless(): 208 # Run with --askpass but no askpass set, succeeds only if passwordless sudo 209 # is available. 210 (ret, _) = subprocess.getstatusoutput("SUDO_ASKPASS=false sudo --askpass true") 211 return ret == 0 212 213 214def rust_sysroot(): 215 "Returns path to the rust sysroot (e.g. ~/.rustup/toolchains/$version)." 216 from .command import cmd 217 218 return Path(cmd("rustc --print=sysroot").stdout()) 219 220 221SHORTHANDS = { 222 "mingw64": "x86_64-pc-windows-gnu", 223 "msvc64": "x86_64-pc-windows-msvc", 224 "armhf": "armv7-unknown-linux-gnueabihf", 225 "aarch64": "aarch64-unknown-linux-gnu", 226 "riscv64": "riscv64gc-unknown-linux-gnu", 227 "x86_64": "x86_64-unknown-linux-gnu", 228 "android": "aarch64-linux-android", 229} 230 231 232class Triple(NamedTuple): 233 """ 234 Build triple in cargo format. 235 236 The format is: <arch><sub>-<vendor>-<sys>-<abi>, However, we will treat <arch><sub> as a single 237 arch to simplify things. 238 """ 239 240 arch: str 241 vendor: str 242 sys: Optional[str] 243 abi: Optional[str] 244 245 @classmethod 246 def from_shorthand(cls, shorthand: str): 247 "These shorthands make it easier to specify triples on the command line." 248 if "-" in shorthand: 249 triple = shorthand 250 elif shorthand in SHORTHANDS: 251 triple = SHORTHANDS[shorthand] 252 else: 253 raise Exception(f"Not a valid build triple shorthand: {shorthand}") 254 return cls.from_str(triple) 255 256 @classmethod 257 def from_str(cls, triple: str): 258 parts = triple.split("-") 259 if len(parts) < 2: 260 raise Exception(f"Unsupported triple {triple}") 261 return cls( 262 parts[0], 263 parts[1], 264 parts[2] if len(parts) > 2 else None, 265 parts[3] if len(parts) > 3 else None, 266 ) 267 268 @classmethod 269 def from_linux_arch(cls, arch: str): 270 "Rough logic to convert the output of `arch` into a corresponding linux build triple." 271 if arch == "armhf": 272 return cls.from_str("armv7-unknown-linux-gnueabihf") 273 else: 274 return cls.from_str(f"{arch}-unknown-linux-gnu") 275 276 @classmethod 277 def host_default(cls): 278 "Returns the default build triple of the host." 279 rustc_info = subprocess.check_output(["rustc", "-vV"], text=True) 280 match = re.search(r"host: (\S+)", rustc_info) 281 if not match: 282 raise Exception(f"Cannot parse rustc info: {rustc_info}") 283 return cls.from_str(match.group(1)) 284 285 @property 286 def feature_flag(self): 287 triple_to_shorthand = {v: k for k, v in SHORTHANDS.items()} 288 shorthand = triple_to_shorthand.get(str(self)) 289 if not shorthand: 290 raise Exception(f"No feature set for triple {self}") 291 return f"all-{shorthand}" 292 293 @property 294 def target_dir(self): 295 return crosvm_target_dir() / str(self) 296 297 def get_cargo_env(self): 298 """Environment variables to make cargo use the test target.""" 299 env: Dict[str, str] = {} 300 cargo_target = str(self) 301 env["CARGO_BUILD_TARGET"] = cargo_target 302 env["CARGO_TARGET_DIR"] = str(self.target_dir) 303 env["CROSVM_TARGET_DIR"] = str(crosvm_target_dir()) 304 # Android builds are not fully supported and can only be used to run clippy. 305 # Underlying libraries (e.g. minijail) will be built for linux instead 306 # TODO(denniskempin): This could be better done with [env] in Cargo.toml if it supported 307 # per-target configuration. See https://github.com/rust-lang/cargo/issues/10273 308 if str(self).endswith("-linux-android"): 309 env["MINIJAIL_DO_NOT_BUILD"] = "true" 310 env["MINIJAIL_BINDGEN_TARGET"] = f"{self.arch}-unknown-linux-gnu" 311 # Hack: Rust after 1.77 on windows will fail to find dlltool.exe when building proc-macros. 312 # The tool exists in the self contained gnu toolchain, but somehow the PATH is missing. 313 # TODO(b/396467061): Remove after fixed upstream 314 if str(self).endswith("windows-gnu"): 315 env["PATH"] = ( 316 os.environ["PATH"] 317 + ";" 318 + f"{rust_sysroot()}/lib/rustlib/{str(self)}/bin/self-contained" 319 ) 320 return env 321 322 def __str__(self): 323 parts = [self.arch, self.vendor] 324 if self.sys: 325 parts = [*parts, self.sys] 326 if self.abi: 327 parts = [*parts, self.abi] 328 return "-".join(parts) 329 330 331def download_file(url: str, filename: Path, attempts: int = 3): 332 assert attempts > 0 333 while True: 334 attempts -= 1 335 try: 336 urllib.request.urlretrieve(url, filename) 337 return 338 except Exception as e: 339 if attempts == 0: 340 raise e 341 else: 342 print("Download failed:", e) 343 344 345def strip_ansi_escape_sequences(line: str) -> str: 346 return ANSI_ESCAPE.sub("", line) 347 348 349def ensure_packages_exist(*packages: str): 350 """ 351 Exits if one of the listed packages does not exist. 352 """ 353 missing_packages: List[str] = [] 354 355 for package in packages: 356 try: 357 __import__(package) 358 except ImportError: 359 missing_packages.append(package) 360 361 if missing_packages: 362 debian_packages = [f"python3-{p}" for p in missing_packages] 363 package_list = " ".join(debian_packages) 364 print("Missing python dependencies. Please re-run ./tools/setup") 365 print(f"Or `sudo apt install {package_list}`") 366 sys.exit(1) 367 368 369@contextlib.contextmanager 370def record_time(title: str): 371 """ 372 Records wall-time of how long this context lasts. 373 374 The results will be printed at the end of script executation if --timing-info is specified. 375 """ 376 start_time = datetime.datetime.now() 377 try: 378 yield 379 finally: 380 global_time_records.append((title, datetime.datetime.now() - start_time)) 381 382 383def print_timing_info(): 384 print() 385 print("Timing info:") 386 print() 387 for title, delta in global_time_records: 388 print(f" {title:20} {delta.total_seconds():.2f}s") 389