1# Copyright 2023 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import annotations 6 7import abc 8import collections.abc 9import contextlib 10import datetime as dt 11import functools 12import inspect 13import logging 14import os 15import pathlib 16import platform as py_platform 17import shlex 18import shutil 19import subprocess 20import sys 21import tempfile 22import time 23import urllib.error 24import urllib.parse 25import urllib.request 26from typing import (TYPE_CHECKING, Any, Callable, Dict, Final, Generator, 27 Iterable, Iterator, List, Mapping, Optional, Sequence, 28 Tuple, Union) 29 30import psutil 31 32from crossbench import path as pth 33from crossbench.plt.arch import MachineArch 34from crossbench.plt.bin import Binary 35 36if TYPE_CHECKING: 37 from crossbench.path import LocalPath 38 from crossbench.types import JsonDict 39 40 41CmdArg = pth.AnyPathLike 42ListCmdArgs = List[CmdArg] 43TupleCmdArgs = Tuple[CmdArg, ...] 44CmdArgs = Union[ListCmdArgs, TupleCmdArgs] 45 46 47class Environ(collections.abc.MutableMapping, metaclass=abc.ABCMeta): 48 pass 49 50 51class LocalEnviron(Environ): 52 53 def __init__(self) -> None: 54 self._environ = os.environ 55 56 def __getitem__(self, key: str) -> str: 57 return self._environ.__getitem__(key) 58 59 def __setitem__(self, key: str, item: str) -> None: 60 self._environ.__setitem__(key, item) 61 62 def __delitem__(self, key: str) -> None: 63 self._environ.__delitem__(key) 64 65 def __iter__(self) -> Iterator[str]: 66 return self._environ.__iter__() 67 68 def __len__(self) -> int: 69 return self._environ.__len__() 70 71 72class SubprocessError(subprocess.CalledProcessError): 73 """ Custom version that also prints stderr for debugging""" 74 75 def __init__(self, platform: Platform, process) -> None: 76 self.platform = platform 77 super().__init__(process.returncode, shlex.join(map(str, process.args)), 78 process.stdout, process.stderr) 79 80 def __str__(self) -> str: 81 super_str = super().__str__() 82 if not self.stderr: 83 return f"{self.platform}: {super_str}" 84 return f"{self.platform}: {super_str}\nstderr:{self.stderr.decode()}" 85 86 87_IGNORED_PROCESS_EXCEPTIONS: Final = (psutil.NoSuchProcess, psutil.AccessDenied, 88 psutil.ZombieProcess) 89 90DEFAULT_CACHE_DIR = pth.LocalPath(__file__).parents[2] / "cache" 91 92class Platform(abc.ABC): 93 # pylint: disable=locally-disabled, redefined-builtin 94 95 def __init__(self) -> None: 96 self._binary_lookup_override: Dict[str, pth.AnyPath] = {} 97 self._cache_dir: Optional[pth.AnyPath] = None 98 if self.is_local: 99 self._cache_dir = DEFAULT_CACHE_DIR 100 101 def assert_is_local(self) -> None: 102 if self.is_local: 103 return 104 caller = "assert_is_local" 105 caller = inspect.stack()[1].function 106 raise RuntimeError(f"{type(self).__name__}.{caller}(...) is not supported " 107 "on remote platform") 108 109 @property 110 @abc.abstractmethod 111 def name(self) -> str: 112 pass 113 114 @property 115 @abc.abstractmethod 116 def version(self) -> str: 117 pass 118 119 @property 120 @abc.abstractmethod 121 def device(self) -> str: 122 pass 123 124 @property 125 @abc.abstractmethod 126 def cpu(self) -> str: 127 pass 128 129 @property 130 def full_version(self) -> str: 131 return f"{self.name} {self.version} {self.machine}" 132 133 def __str__(self) -> str: 134 return ".".join(self.key) + (".remote" if self.is_remote else ".local") 135 136 @property 137 def is_remote(self) -> bool: 138 return False 139 140 @property 141 def is_local(self) -> bool: 142 return not self.is_remote 143 144 @property 145 def host_platform(self) -> Platform: 146 return self 147 148 @functools.cached_property 149 def machine(self) -> MachineArch: 150 raw = self._raw_machine_arch() 151 if raw in ("i386", "i686", "x86", "ia32"): 152 return MachineArch.IA32 153 if raw in ("x86_64", "AMD64"): 154 return MachineArch.X64 155 if raw in ("arm64", "aarch64"): 156 return MachineArch.ARM_64 157 if raw in ("arm"): 158 return MachineArch.ARM_32 159 raise NotImplementedError(f"Unsupported machine type: {raw}") 160 161 def _raw_machine_arch(self) -> str: 162 self.assert_is_local() 163 return py_platform.machine() 164 165 @property 166 def is_ia32(self) -> bool: 167 return self.machine == MachineArch.IA32 168 169 @property 170 def is_x64(self) -> bool: 171 return self.machine == MachineArch.X64 172 173 @property 174 def is_arm64(self) -> bool: 175 return self.machine == MachineArch.ARM_64 176 177 @property 178 def key(self) -> Tuple[str, str]: 179 return (self.name, str(self.machine)) 180 181 @property 182 def is_macos(self) -> bool: 183 return False 184 185 @property 186 def is_linux(self) -> bool: 187 return False 188 189 @property 190 def is_android(self) -> bool: 191 return False 192 193 @property 194 def is_chromeos(self) -> bool: 195 return False 196 197 @property 198 def is_posix(self) -> bool: 199 return self.is_macos or self.is_linux or self.is_android 200 201 @property 202 def is_win(self) -> bool: 203 return False 204 205 @property 206 def is_remote_ssh(self) -> bool: 207 return False 208 209 @property 210 def environ(self) -> Environ: 211 self.assert_is_local() 212 return LocalEnviron() 213 214 @property 215 def is_battery_powered(self) -> bool: 216 self.assert_is_local() 217 if not psutil.sensors_battery: 218 return False 219 status = psutil.sensors_battery() 220 if not status: 221 return False 222 return not status.power_plugged 223 224 def _search_executable( 225 self, 226 name: str, 227 macos: Sequence[str], 228 win: Sequence[str], 229 linux: Sequence[str], 230 lookup_callable: Callable[[pth.AnyPath], Optional[pth.AnyPath]], 231 ) -> pth.AnyPath: 232 executables: Sequence[str] = [] 233 if self.is_macos: 234 executables = macos 235 elif self.is_win: 236 executables = win 237 elif self.is_linux: 238 executables = linux 239 if not executables: 240 raise ValueError(f"Executable {name} not supported on {self}") 241 for name_or_path in executables: 242 path = self.local_path(name_or_path).expanduser() 243 binary = lookup_callable(path) 244 if binary and self.exists(binary): 245 return binary 246 raise ValueError(f"Executable {name} not found on {self}") 247 248 def search_app_or_executable( 249 self, 250 name: str, 251 macos: Sequence[str] = (), 252 win: Sequence[str] = (), 253 linux: Sequence[str] = () 254 ) -> pth.AnyPath: 255 return self._search_executable(name, macos, win, linux, self.search_app) 256 257 def search_platform_binary( 258 self, 259 name: str, 260 macos: Sequence[str] = (), 261 win: Sequence[str] = (), 262 linux: Sequence[str] = () 263 ) -> pth.AnyPath: 264 return self._search_executable(name, macos, win, linux, self.search_binary) 265 266 def search_app(self, app_or_bin: pth.AnyPath) -> Optional[pth.AnyPath]: 267 """Look up a application bundle (macos) or binary (all other platforms) in 268 the common search paths. 269 """ 270 return self.search_binary(app_or_bin) 271 272 @abc.abstractmethod 273 def search_binary(self, app_or_bin: pth.AnyPathLike) -> Optional[pth.AnyPath]: 274 """Look up a binary in the common search paths based of a path or a single 275 segment path with just the binary name. 276 Returns the location of the binary (and not the .app bundle on macOS). 277 """ 278 279 @abc.abstractmethod 280 def app_version(self, app_or_bin: pth.AnyPathLike) -> str: 281 pass 282 283 @property 284 def has_display(self) -> bool: 285 """Return a bool whether the platform has an active display. 286 This can be false on linux without $DISPLAY, true an all other platforms.""" 287 return True 288 289 def sleep(self, seconds: Union[int, float, dt.timedelta]) -> None: 290 if isinstance(seconds, dt.timedelta): 291 seconds = seconds.total_seconds() 292 if seconds == 0: 293 return 294 logging.debug("WAIT %ss", seconds) 295 time.sleep(seconds) 296 297 def which(self, binary_name: pth.AnyPathLike) -> Optional[pth.AnyPath]: 298 if not binary_name: 299 raise ValueError("Got empty path") 300 self.assert_is_local() 301 if override := self.lookup_binary_override(binary_name): 302 return override 303 if result := shutil.which(os.fspath(binary_name)): 304 return self.path(result) 305 return None 306 307 def lookup_binary_override( 308 self, binary_name: pth.AnyPathLike) -> Optional[pth.AnyPath]: 309 return self._binary_lookup_override.get(os.fspath(binary_name)) 310 311 def set_binary_lookup_override(self, binary_name: pth.AnyPathLike, 312 new_path: Optional[pth.AnyPath]): 313 name = os.fspath(binary_name) 314 if new_path is None: 315 prev_result = self._binary_lookup_override.pop(name, None) 316 if prev_result is None: 317 logging.debug( 318 "Could not remove binary override for %s as it was never set", 319 binary_name) 320 return 321 if self.search_binary(new_path) is None: 322 raise ValueError(f"Suggested binary override for {repr(name)} " 323 f"does not exist: {new_path}") 324 self._binary_lookup_override[name] = new_path 325 326 @contextlib.contextmanager 327 def override_binary(self, binary: Union[pth.AnyPathLike, Binary], 328 result: Optional[pth.AnyPath]): 329 binary_name: pth.AnyPathLike = "" 330 if isinstance(binary, Binary): 331 if override := binary.platform_path(self): 332 binary_name = override[0] 333 else: 334 raise RuntimeError("Cannot override binary:" 335 f" {binary} is not supported supported on {self}") 336 else: 337 binary_name = binary 338 prev_override = self.lookup_binary_override(binary_name) 339 self.set_binary_lookup_override(binary_name, result) 340 try: 341 yield 342 finally: 343 self.set_binary_lookup_override(binary_name, prev_override) 344 345 def processes(self, 346 attrs: Optional[List[str]] = None) -> List[Dict[str, Any]]: 347 # TODO(cbruni): support remote platforms 348 assert self.is_local, "Only local platform supported" 349 return self._collect_process_dict(psutil.process_iter(attrs=attrs)) 350 351 def process_running(self, process_name_list: List[str]) -> Optional[str]: 352 self.assert_is_local() 353 # TODO(cbruni): support remote platforms 354 for proc in psutil.process_iter(attrs=["name"]): 355 try: 356 if proc.name().lower() in process_name_list: 357 return proc.name() 358 except _IGNORED_PROCESS_EXCEPTIONS: 359 pass 360 return None 361 362 def process_children(self, 363 parent_pid: int, 364 recursive: bool = False) -> List[Dict[str, Any]]: 365 self.assert_is_local() 366 # TODO(cbruni): support remote platforms 367 try: 368 process = psutil.Process(parent_pid) 369 except _IGNORED_PROCESS_EXCEPTIONS: 370 return [] 371 return self._collect_process_dict(process.children(recursive=recursive)) 372 373 def _collect_process_dict( 374 self, process_iterator: Iterable[psutil.Process]) -> List[Dict[str, Any]]: 375 process_info_list: List[Dict[str, Any]] = [] 376 for process in process_iterator: 377 try: 378 process_info_list.append(process.as_dict()) 379 except _IGNORED_PROCESS_EXCEPTIONS: 380 pass 381 return process_info_list 382 383 def process_info(self, pid: int) -> Optional[Dict[str, Any]]: 384 self.assert_is_local() 385 # TODO(cbruni): support remote platforms 386 try: 387 return psutil.Process(pid).as_dict() 388 except _IGNORED_PROCESS_EXCEPTIONS: 389 return None 390 391 def foreground_process(self) -> Optional[Dict[str, Any]]: 392 return None 393 394 def terminate(self, proc_pid: int) -> None: 395 self.assert_is_local() 396 # TODO(cbruni): support remote platforms 397 process = psutil.Process(proc_pid) 398 for proc in process.children(recursive=True): 399 proc.terminate() 400 process.terminate() 401 402 @property 403 def default_tmp_dir(self) -> pth.AnyPath: 404 self.assert_is_local() 405 return self.path(tempfile.gettempdir()) 406 407 def port_forward(self, local_port: int, remote_port: int) -> int: 408 if remote_port != local_port: 409 raise ValueError("Cannot forward a remote port on a local platform.") 410 self.assert_is_local() 411 return local_port 412 413 def stop_port_forward(self, local_port: int) -> None: 414 del local_port 415 self.assert_is_local() 416 417 def reverse_port_forward(self, remote_port: int, local_port: int) -> int: 418 if remote_port != local_port: 419 raise ValueError("Cannot forward a remote port on a local platform.") 420 self.assert_is_local() 421 return remote_port 422 423 def stop_reverse_port_forward(self, remote_port: int) -> None: 424 del remote_port 425 self.assert_is_local() 426 427 def local_cache_dir(self, name: Optional[str] = None) -> pth.LocalPath: 428 return self.local_path(self.cache_dir(name)) 429 430 def cache_dir(self, name: Optional[str] = None) -> pth.AnyPath: 431 assert self._cache_dir, "missing cache dir" 432 if not name: 433 dir = self._cache_dir 434 else: 435 dir = self._cache_dir / pth.safe_filename(name) 436 self.mkdir(dir, parents=True, exist_ok=True) 437 return dir 438 439 def set_cache_dir(self, path: pth.AnyPath) -> None: 440 self._cache_dir = path 441 self.mkdir(path, parents=True, exist_ok=True) 442 443 def cat(self, file: pth.AnyPathLike, encoding: str = "utf-8") -> str: 444 """Meow! I return the file contents as a str.""" 445 with self.local_path(file).open(encoding=encoding) as f: 446 return f.read() 447 448 def cat_bytes(self, file: pth.AnyPathLike) -> bytes: 449 """Hiss! I return the file contents as bytes.""" 450 with self.local_path(file).open("rb") as f: 451 return f.read() 452 453 def get_file_contents(self, 454 file: pth.AnyPathLike, 455 encoding: str = "utf-8") -> str: 456 return self.cat(file, encoding) 457 458 def set_file_contents(self, 459 file: pth.AnyPathLike, 460 data: str, 461 encoding: str = "utf-8") -> None: 462 with self.local_path(file).open("w", encoding=encoding) as f: 463 f.write(data) 464 465 def pull(self, from_path: pth.AnyPath, 466 to_path: pth.LocalPath) -> pth.LocalPath: 467 """ Download / Copy a (remote) file to the local filesystem. 468 By default this is just a copy operation on the local filesystem. 469 """ 470 self.assert_is_local() 471 return self.local_path(self.copy_file(from_path, to_path)) 472 473 def push(self, from_path: pth.LocalPath, to_path: pth.AnyPath) -> pth.AnyPath: 474 """ Copy a local file to this (remote) platform. 475 By default this is just a copy operation on the local filesystem. 476 """ 477 self.assert_is_local() 478 return self.copy_file(from_path, to_path) 479 480 def copy(self, from_path: pth.AnyPath, to_path: pth.AnyPath) -> pth.AnyPath: 481 """ Convenience implementation for copying local files and dirs """ 482 if not self.exists(from_path): 483 raise ValueError(f"Cannot copy non-existing source path: {from_path}") 484 if self.is_dir(from_path): 485 return self.copy_dir(from_path, to_path) 486 return self.copy_file(from_path, to_path) 487 488 def copy_dir(self, from_path: pth.AnyPathLike, 489 to_path: pth.AnyPathLike) -> pth.AnyPath: 490 from_path = self.local_path(from_path) 491 to_path = self.local_path(to_path) 492 self.mkdir(to_path.parent, parents=True, exist_ok=True) 493 shutil.copytree(os.fspath(from_path), os.fspath(to_path)) 494 return to_path 495 496 def copy_file(self, from_path: pth.AnyPathLike, 497 to_path: pth.AnyPathLike) -> pth.AnyPath: 498 from_path = self.local_path(from_path) 499 to_path = self.local_path(to_path) 500 self.mkdir(to_path.parent, parents=True, exist_ok=True) 501 shutil.copy2(os.fspath(from_path), os.fspath(to_path)) 502 return to_path 503 504 def rm(self, 505 path: pth.AnyPathLike, 506 dir: bool = False, 507 missing_ok: bool = False) -> None: 508 """Remove a single file on this platform.""" 509 path = self.local_path(path) 510 if dir: 511 if missing_ok and not self.exists(path): 512 return 513 shutil.rmtree(os.fspath(path)) 514 else: 515 path.unlink(missing_ok) 516 517 def rename(self, src_path: pth.AnyPathLike, 518 dst_path: pth.AnyPathLike) -> pth.AnyPath: 519 """Remove a single file on this platform.""" 520 return self.local_path(src_path).rename(dst_path) 521 522 def symlink_or_copy(self, src: pth.AnyPathLike, 523 dst: pth.AnyPathLike) -> pth.AnyPath: 524 """Windows does not support symlinking without admin support. 525 Copy files on windows (see WinPlatform) but symlink everywhere else.""" 526 assert not self.is_win, "Unsupported operation 'symlink_or_copy' on windows" 527 dst_path = self.local_path(dst) 528 dst_path.symlink_to(self.path(src)) 529 return dst_path 530 531 def path(self, path: pth.AnyPathLike) -> pth.AnyPath: 532 """"Used to convert any paths and strings to a platform specific 533 remote path. 534 For instance a remote ADB platform on windows returns posix paths: 535 posix_path = adb_remote_platform.patch(windows_path) 536 This is used when passing out platform specific paths to remote shell 537 commands. 538 """ 539 return self.local_path(path) 540 541 def local_path(self, path: pth.AnyPathLike) -> pth.LocalPath: 542 self.assert_is_local() 543 return pth.LocalPath(path) 544 545 def absolute(self, path: pth.AnyPathLike) -> pth.AnyPath: 546 """Convert an arbitrary path to a platform-specific absolute path""" 547 platform_path: pth.AnyPath = self.path(path) 548 if platform_path.is_absolute(): 549 return platform_path 550 if self.is_local: 551 return self.local_path(platform_path).absolute() 552 raise RuntimeError( 553 f"Converting relative to absolute paths is not supported on {self}") 554 555 def home(self) -> pth.AnyPath: 556 return pathlib.Path.home() 557 558 def touch(self, path: pth.AnyPathLike) -> None: 559 self.local_path(path).touch(exist_ok=True) 560 561 def mkdir(self, 562 path: pth.AnyPathLike, 563 parents: bool = True, 564 exist_ok: bool = True) -> None: 565 self.local_path(path).mkdir(parents=parents, exist_ok=exist_ok) 566 567 @contextlib.contextmanager 568 def NamedTemporaryFile( # pylint: disable=invalid-name 569 self, 570 prefix: Optional[str] = None, 571 dir: Optional[pth.AnyPathLike] = None): 572 tmp_file: LocalPath = self.host_platform.local_path( 573 self.host_platform.mktemp(prefix, dir)) 574 try: 575 yield tmp_file 576 finally: 577 self.rm(tmp_file, missing_ok=True) 578 579 def mkdtemp(self, 580 prefix: Optional[str] = None, 581 dir: Optional[pth.AnyPathLike] = None) -> pth.AnyPath: 582 self.assert_is_local() 583 return self.path(tempfile.mkdtemp(prefix=prefix, dir=dir)) 584 585 def mktemp(self, 586 prefix: Optional[str] = None, 587 dir: Optional[pth.AnyPathLike] = None) -> pth.AnyPath: 588 self.assert_is_local() 589 fd, name = tempfile.mkstemp(prefix=prefix, dir=dir) 590 os.close(fd) 591 return self.path(name) 592 593 @contextlib.contextmanager 594 def TemporaryDirectory( # pylint: disable=invalid-name 595 self, 596 prefix: Optional[str] = None, 597 dir: Optional[pth.AnyPathLike] = None): 598 tmp_dir = self.mkdtemp(prefix, dir) 599 try: 600 yield tmp_dir 601 finally: 602 self.rm(tmp_dir, dir=True, missing_ok=True) 603 604 def exists(self, path: pth.AnyPathLike) -> bool: 605 return self.local_path(path).exists() 606 607 def is_file(self, path: pth.AnyPathLike) -> bool: 608 return self.local_path(path).is_file() 609 610 def is_dir(self, path: pth.AnyPathLike) -> bool: 611 return self.local_path(path).is_dir() 612 613 def iterdir(self, 614 path: pth.AnyPathLike) -> Generator[pth.AnyPath, None, None]: 615 return self.local_path(path).iterdir() 616 617 def glob(self, path: pth.AnyPathLike, 618 pattern: str) -> Generator[pth.AnyPath, None, None]: 619 # TODO: support remotely 620 return self.local_path(path).glob(pattern) 621 622 def file_size(self, path: pth.AnyPathLike) -> int: 623 # TODO: support remotely 624 return self.local_path(path).stat().st_size 625 626 def sh_stdout(self, 627 *args: CmdArg, 628 shell: bool = False, 629 quiet: bool = False, 630 encoding: str = "utf-8", 631 stdin=None, 632 env: Optional[Mapping[str, str]] = None, 633 check: bool = True) -> str: 634 result = self.sh_stdout_bytes( 635 *args, shell=shell, quiet=quiet, stdin=stdin, env=env, check=check) 636 return result.decode(encoding) 637 638 def sh_stdout_bytes(self, 639 *args: CmdArg, 640 shell: bool = False, 641 quiet: bool = False, 642 stdin=None, 643 env: Optional[Mapping[str, str]] = None, 644 check: bool = True) -> bytes: 645 completed_process = self.sh( 646 *args, 647 shell=shell, 648 capture_output=True, 649 quiet=quiet, 650 stdin=stdin, 651 env=env, 652 check=check) 653 return completed_process.stdout 654 655 def popen(self, 656 *args: CmdArg, 657 bufsize=-1, 658 shell: bool = False, 659 stdout=None, 660 stderr=None, 661 stdin=None, 662 env: Optional[Mapping[str, str]] = None, 663 quiet: bool = False) -> subprocess.Popen: 664 self.assert_is_local() 665 if not quiet: 666 logging.debug("SHELL: %s", shlex.join(map(str, args))) 667 logging.debug("CWD: %s", os.getcwd()) 668 return subprocess.Popen( 669 args=args, 670 bufsize=bufsize, 671 shell=shell, 672 stdin=stdin, 673 stderr=stderr, 674 stdout=stdout, 675 env=env) 676 677 def sh(self, 678 *args: CmdArg, 679 shell: bool = False, 680 capture_output: bool = False, 681 stdout=None, 682 stderr=None, 683 stdin=None, 684 env: Optional[Mapping[str, str]] = None, 685 quiet: bool = False, 686 check: bool = True) -> subprocess.CompletedProcess: 687 self.assert_is_local() 688 if not quiet: 689 logging.debug("SHELL: %s", shlex.join(map(str, args))) 690 logging.debug("CWD: %s", os.getcwd()) 691 process = subprocess.run( 692 args=args, 693 shell=shell, 694 stdin=stdin, 695 stdout=stdout, 696 stderr=stderr, 697 env=env, 698 capture_output=capture_output, 699 check=False) 700 if check and process.returncode != 0: 701 raise SubprocessError(self, process) 702 return process 703 704 def exec_apple_script(self, script: str, *args: str) -> str: 705 del script, args 706 raise NotImplementedError("AppleScript is only available on MacOS") 707 708 def log(self, *messages: Any, level: int = 2) -> None: 709 message_str = " ".join(map(str, messages)) 710 if level == 3: 711 level = logging.DEBUG 712 if level == 2: 713 level = logging.INFO 714 if level == 1: 715 level = logging.WARNING 716 if level == 0: 717 level = logging.ERROR 718 logging.log(level, message_str) 719 720 # TODO(cbruni): split into separate list_system_monitoring and 721 # disable_system_monitoring methods 722 def check_system_monitoring(self, disable: bool = False) -> bool: 723 # pylint: disable=unused-argument 724 return True 725 726 def get_relative_cpu_speed(self) -> float: 727 return 1 728 729 def is_thermal_throttled(self) -> bool: 730 return self.get_relative_cpu_speed() < 1 731 732 def disk_usage(self, path: pth.AnyPathLike) -> psutil._common.sdiskusage: 733 return psutil.disk_usage(str(self.local_path(path))) 734 735 def cpu_usage(self) -> float: 736 self.assert_is_local() 737 return 1 - psutil.cpu_times_percent().idle / 100 738 739 def cpu_details(self) -> Dict[str, Any]: 740 self.assert_is_local() 741 details = { 742 "physical cores": 743 psutil.cpu_count(logical=False), 744 "logical cores": 745 psutil.cpu_count(logical=True), 746 "usage": 747 psutil.cpu_percent( # pytype: disable=attribute-error 748 percpu=True, interval=0.1), 749 "total usage": 750 psutil.cpu_percent(), 751 "system load": 752 psutil.getloadavg(), 753 "info": 754 self.cpu, 755 } 756 try: 757 cpu_freq = psutil.cpu_freq() 758 except FileNotFoundError as e: 759 logging.debug("psutil.cpu_freq() failed (normal on macOS M1): %s", e) 760 return details 761 details.update({ 762 "max frequency": f"{cpu_freq.max:.2f}Mhz", 763 "min frequency": f"{cpu_freq.min:.2f}Mhz", 764 "current frequency": f"{cpu_freq.current:.2f}Mhz", 765 }) 766 return details 767 768 def system_details(self) -> Dict[str, Any]: 769 return { 770 "machine": str(self.machine), 771 "os": self.os_details(), 772 "python": self.python_details(), 773 "CPU": self.cpu_details(), 774 } 775 776 def os_details(self) -> JsonDict: 777 self.assert_is_local() 778 return { 779 "system": py_platform.system(), 780 "release": py_platform.release(), 781 "version": py_platform.version(), 782 "platform": py_platform.platform(), 783 } 784 785 def python_details(self) -> JsonDict: 786 self.assert_is_local() 787 return { 788 "version": py_platform.python_version(), 789 "bits": 64 if sys.maxsize > 2**32 else 32, 790 } 791 792 def download_to(self, url: str, path: pth.LocalPath) -> pth.LocalPath: 793 self.assert_is_local() 794 logging.debug("DOWNLOAD: %s\n TO: %s", url, path) 795 assert not path.exists(), f"Download destination {path} exists already." 796 try: 797 urllib.request.urlretrieve(url, path) 798 except (urllib.error.HTTPError, urllib.error.URLError) as e: 799 raise OSError(f"Could not load {url}") from e 800 assert path.exists(), ( 801 f"Downloading {url} failed. Downloaded file {path} doesn't exist.") 802 return path 803 804 def concat_files(self, 805 inputs: Iterable[pth.LocalPath], 806 output: pth.LocalPath, 807 prefix: str = "") -> pth.LocalPath: 808 self.assert_is_local() 809 with output.open("w", encoding="utf-8") as output_f: 810 if prefix: 811 output_f.write(prefix) 812 for input_file in inputs: 813 assert input_file.is_file() 814 with input_file.open(encoding="utf-8") as input_f: 815 shutil.copyfileobj(input_f, output_f) 816 return output 817 818 def set_main_display_brightness(self, brightness_level: int) -> None: 819 raise NotImplementedError( 820 "'set_main_display_brightness' is only available on MacOS for now") 821 822 def get_main_display_brightness(self) -> int: 823 raise NotImplementedError( 824 "'get_main_display_brightness' is only available on MacOS for now") 825 826 def check_autobrightness(self) -> bool: 827 raise NotImplementedError( 828 "'check_autobrightness' is only available on MacOS for now") 829 830 def screenshot(self, result_path: pth.AnyPath) -> None: 831 # TODO: support screen coordinates 832 raise NotImplementedError( 833 "'screenshot' is only available on MacOS for now") 834