• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import annotations
6
7import abc
8import collections.abc
9import contextlib
10import datetime as dt
11import functools
12import inspect
13import logging
14import os
15import pathlib
16import platform as py_platform
17import shlex
18import shutil
19import subprocess
20import sys
21import tempfile
22import time
23import urllib.error
24import urllib.parse
25import urllib.request
26from typing import (TYPE_CHECKING, Any, Callable, Dict, Final, Generator,
27                    Iterable, Iterator, List, Mapping, Optional, Sequence,
28                    Tuple, Union)
29
30import psutil
31
32from crossbench import path as pth
33from crossbench.plt.arch import MachineArch
34from crossbench.plt.bin import Binary
35
36if TYPE_CHECKING:
37  from crossbench.path import LocalPath
38  from crossbench.types import JsonDict
39
40
41CmdArg = pth.AnyPathLike
42ListCmdArgs = List[CmdArg]
43TupleCmdArgs = Tuple[CmdArg, ...]
44CmdArgs = Union[ListCmdArgs, TupleCmdArgs]
45
46
47class Environ(collections.abc.MutableMapping, metaclass=abc.ABCMeta):
48  pass
49
50
51class LocalEnviron(Environ):
52
53  def __init__(self) -> None:
54    self._environ = os.environ
55
56  def __getitem__(self, key: str) -> str:
57    return self._environ.__getitem__(key)
58
59  def __setitem__(self, key: str, item: str) -> None:
60    self._environ.__setitem__(key, item)
61
62  def __delitem__(self, key: str) -> None:
63    self._environ.__delitem__(key)
64
65  def __iter__(self) -> Iterator[str]:
66    return self._environ.__iter__()
67
68  def __len__(self) -> int:
69    return self._environ.__len__()
70
71
72class SubprocessError(subprocess.CalledProcessError):
73  """ Custom version that also prints stderr for debugging"""
74
75  def __init__(self, platform: Platform, process) -> None:
76    self.platform = platform
77    super().__init__(process.returncode, shlex.join(map(str, process.args)),
78                     process.stdout, process.stderr)
79
80  def __str__(self) -> str:
81    super_str = super().__str__()
82    if not self.stderr:
83      return f"{self.platform}: {super_str}"
84    return f"{self.platform}: {super_str}\nstderr:{self.stderr.decode()}"
85
86
87_IGNORED_PROCESS_EXCEPTIONS: Final = (psutil.NoSuchProcess, psutil.AccessDenied,
88                                      psutil.ZombieProcess)
89
90DEFAULT_CACHE_DIR = pth.LocalPath(__file__).parents[2] / "cache"
91
92class Platform(abc.ABC):
93  # pylint: disable=locally-disabled, redefined-builtin
94
95  def __init__(self) -> None:
96    self._binary_lookup_override: Dict[str, pth.AnyPath] = {}
97    self._cache_dir: Optional[pth.AnyPath] = None
98    if self.is_local:
99      self._cache_dir = DEFAULT_CACHE_DIR
100
101  def assert_is_local(self) -> None:
102    if self.is_local:
103      return
104    caller = "assert_is_local"
105    caller = inspect.stack()[1].function
106    raise RuntimeError(f"{type(self).__name__}.{caller}(...) is not supported "
107                       "on remote platform")
108
109  @property
110  @abc.abstractmethod
111  def name(self) -> str:
112    pass
113
114  @property
115  @abc.abstractmethod
116  def version(self) -> str:
117    pass
118
119  @property
120  @abc.abstractmethod
121  def device(self) -> str:
122    pass
123
124  @property
125  @abc.abstractmethod
126  def cpu(self) -> str:
127    pass
128
129  @property
130  def full_version(self) -> str:
131    return f"{self.name} {self.version} {self.machine}"
132
133  def __str__(self) -> str:
134    return ".".join(self.key) + (".remote" if self.is_remote else ".local")
135
136  @property
137  def is_remote(self) -> bool:
138    return False
139
140  @property
141  def is_local(self) -> bool:
142    return not self.is_remote
143
144  @property
145  def host_platform(self) -> Platform:
146    return self
147
148  @functools.cached_property
149  def machine(self) -> MachineArch:
150    raw = self._raw_machine_arch()
151    if raw in ("i386", "i686", "x86", "ia32"):
152      return MachineArch.IA32
153    if raw in ("x86_64", "AMD64"):
154      return MachineArch.X64
155    if raw in ("arm64", "aarch64"):
156      return MachineArch.ARM_64
157    if raw in ("arm"):
158      return MachineArch.ARM_32
159    raise NotImplementedError(f"Unsupported machine type: {raw}")
160
161  def _raw_machine_arch(self) -> str:
162    self.assert_is_local()
163    return py_platform.machine()
164
165  @property
166  def is_ia32(self) -> bool:
167    return self.machine == MachineArch.IA32
168
169  @property
170  def is_x64(self) -> bool:
171    return self.machine == MachineArch.X64
172
173  @property
174  def is_arm64(self) -> bool:
175    return self.machine == MachineArch.ARM_64
176
177  @property
178  def key(self) -> Tuple[str, str]:
179    return (self.name, str(self.machine))
180
181  @property
182  def is_macos(self) -> bool:
183    return False
184
185  @property
186  def is_linux(self) -> bool:
187    return False
188
189  @property
190  def is_android(self) -> bool:
191    return False
192
193  @property
194  def is_chromeos(self) -> bool:
195    return False
196
197  @property
198  def is_posix(self) -> bool:
199    return self.is_macos or self.is_linux or self.is_android
200
201  @property
202  def is_win(self) -> bool:
203    return False
204
205  @property
206  def is_remote_ssh(self) -> bool:
207    return False
208
209  @property
210  def environ(self) -> Environ:
211    self.assert_is_local()
212    return LocalEnviron()
213
214  @property
215  def is_battery_powered(self) -> bool:
216    self.assert_is_local()
217    if not psutil.sensors_battery:
218      return False
219    status = psutil.sensors_battery()
220    if not status:
221      return False
222    return not status.power_plugged
223
224  def _search_executable(
225      self,
226      name: str,
227      macos: Sequence[str],
228      win: Sequence[str],
229      linux: Sequence[str],
230      lookup_callable: Callable[[pth.AnyPath], Optional[pth.AnyPath]],
231  ) -> pth.AnyPath:
232    executables: Sequence[str] = []
233    if self.is_macos:
234      executables = macos
235    elif self.is_win:
236      executables = win
237    elif self.is_linux:
238      executables = linux
239    if not executables:
240      raise ValueError(f"Executable {name} not supported on {self}")
241    for name_or_path in executables:
242      path = self.local_path(name_or_path).expanduser()
243      binary = lookup_callable(path)
244      if binary and self.exists(binary):
245        return binary
246    raise ValueError(f"Executable {name} not found on {self}")
247
248  def search_app_or_executable(
249      self,
250      name: str,
251      macos: Sequence[str] = (),
252      win: Sequence[str] = (),
253      linux: Sequence[str] = ()
254  ) -> pth.AnyPath:
255    return self._search_executable(name, macos, win, linux, self.search_app)
256
257  def search_platform_binary(
258      self,
259      name: str,
260      macos: Sequence[str] = (),
261      win: Sequence[str] = (),
262      linux: Sequence[str] = ()
263  ) -> pth.AnyPath:
264    return self._search_executable(name, macos, win, linux, self.search_binary)
265
266  def search_app(self, app_or_bin: pth.AnyPath) -> Optional[pth.AnyPath]:
267    """Look up a application bundle (macos) or binary (all other platforms) in
268    the common search paths.
269    """
270    return self.search_binary(app_or_bin)
271
272  @abc.abstractmethod
273  def search_binary(self, app_or_bin: pth.AnyPathLike) -> Optional[pth.AnyPath]:
274    """Look up a binary in the common search paths based of a path or a single
275    segment path with just the binary name.
276    Returns the location of the binary (and not the .app bundle on macOS).
277    """
278
279  @abc.abstractmethod
280  def app_version(self, app_or_bin: pth.AnyPathLike) -> str:
281    pass
282
283  @property
284  def has_display(self) -> bool:
285    """Return a bool whether the platform has an active display.
286    This can be false on linux without $DISPLAY, true an all other platforms."""
287    return True
288
289  def sleep(self, seconds: Union[int, float, dt.timedelta]) -> None:
290    if isinstance(seconds, dt.timedelta):
291      seconds = seconds.total_seconds()
292    if seconds == 0:
293      return
294    logging.debug("WAIT %ss", seconds)
295    time.sleep(seconds)
296
297  def which(self, binary_name: pth.AnyPathLike) -> Optional[pth.AnyPath]:
298    if not binary_name:
299      raise ValueError("Got empty path")
300    self.assert_is_local()
301    if override := self.lookup_binary_override(binary_name):
302      return override
303    if result := shutil.which(os.fspath(binary_name)):
304      return self.path(result)
305    return None
306
307  def lookup_binary_override(
308      self, binary_name: pth.AnyPathLike) -> Optional[pth.AnyPath]:
309    return self._binary_lookup_override.get(os.fspath(binary_name))
310
311  def set_binary_lookup_override(self, binary_name: pth.AnyPathLike,
312                                 new_path: Optional[pth.AnyPath]):
313    name = os.fspath(binary_name)
314    if new_path is None:
315      prev_result = self._binary_lookup_override.pop(name, None)
316      if prev_result is None:
317        logging.debug(
318            "Could not remove binary override for %s as it was never set",
319            binary_name)
320      return
321    if self.search_binary(new_path) is None:
322      raise ValueError(f"Suggested binary override for {repr(name)} "
323                       f"does not exist: {new_path}")
324    self._binary_lookup_override[name] = new_path
325
326  @contextlib.contextmanager
327  def override_binary(self, binary: Union[pth.AnyPathLike, Binary],
328                      result: Optional[pth.AnyPath]):
329    binary_name: pth.AnyPathLike = ""
330    if isinstance(binary, Binary):
331      if override := binary.platform_path(self):
332        binary_name = override[0]
333      else:
334        raise RuntimeError("Cannot override binary:"
335                           f" {binary} is not supported supported on {self}")
336    else:
337      binary_name = binary
338    prev_override = self.lookup_binary_override(binary_name)
339    self.set_binary_lookup_override(binary_name, result)
340    try:
341      yield
342    finally:
343      self.set_binary_lookup_override(binary_name, prev_override)
344
345  def processes(self,
346                attrs: Optional[List[str]] = None) -> List[Dict[str, Any]]:
347    # TODO(cbruni): support remote platforms
348    assert self.is_local, "Only local platform supported"
349    return self._collect_process_dict(psutil.process_iter(attrs=attrs))
350
351  def process_running(self, process_name_list: List[str]) -> Optional[str]:
352    self.assert_is_local()
353    # TODO(cbruni): support remote platforms
354    for proc in psutil.process_iter(attrs=["name"]):
355      try:
356        if proc.name().lower() in process_name_list:
357          return proc.name()
358      except _IGNORED_PROCESS_EXCEPTIONS:
359        pass
360    return None
361
362  def process_children(self,
363                       parent_pid: int,
364                       recursive: bool = False) -> List[Dict[str, Any]]:
365    self.assert_is_local()
366    # TODO(cbruni): support remote platforms
367    try:
368      process = psutil.Process(parent_pid)
369    except _IGNORED_PROCESS_EXCEPTIONS:
370      return []
371    return self._collect_process_dict(process.children(recursive=recursive))
372
373  def _collect_process_dict(
374      self, process_iterator: Iterable[psutil.Process]) -> List[Dict[str, Any]]:
375    process_info_list: List[Dict[str, Any]] = []
376    for process in process_iterator:
377      try:
378        process_info_list.append(process.as_dict())
379      except _IGNORED_PROCESS_EXCEPTIONS:
380        pass
381    return process_info_list
382
383  def process_info(self, pid: int) -> Optional[Dict[str, Any]]:
384    self.assert_is_local()
385    # TODO(cbruni): support remote platforms
386    try:
387      return psutil.Process(pid).as_dict()
388    except _IGNORED_PROCESS_EXCEPTIONS:
389      return None
390
391  def foreground_process(self) -> Optional[Dict[str, Any]]:
392    return None
393
394  def terminate(self, proc_pid: int) -> None:
395    self.assert_is_local()
396    # TODO(cbruni): support remote platforms
397    process = psutil.Process(proc_pid)
398    for proc in process.children(recursive=True):
399      proc.terminate()
400    process.terminate()
401
402  @property
403  def default_tmp_dir(self) -> pth.AnyPath:
404    self.assert_is_local()
405    return self.path(tempfile.gettempdir())
406
407  def port_forward(self, local_port: int, remote_port: int) -> int:
408    if remote_port != local_port:
409      raise ValueError("Cannot forward a remote port on a local platform.")
410    self.assert_is_local()
411    return local_port
412
413  def stop_port_forward(self, local_port: int) -> None:
414    del local_port
415    self.assert_is_local()
416
417  def reverse_port_forward(self, remote_port: int, local_port: int) -> int:
418    if remote_port != local_port:
419      raise ValueError("Cannot forward a remote port on a local platform.")
420    self.assert_is_local()
421    return remote_port
422
423  def stop_reverse_port_forward(self, remote_port: int) -> None:
424    del remote_port
425    self.assert_is_local()
426
427  def local_cache_dir(self, name: Optional[str] = None) -> pth.LocalPath:
428    return self.local_path(self.cache_dir(name))
429
430  def cache_dir(self, name: Optional[str] = None) -> pth.AnyPath:
431    assert self._cache_dir, "missing cache dir"
432    if not name:
433      dir = self._cache_dir
434    else:
435      dir = self._cache_dir / pth.safe_filename(name)
436    self.mkdir(dir, parents=True, exist_ok=True)
437    return dir
438
439  def set_cache_dir(self, path: pth.AnyPath) -> None:
440    self._cache_dir = path
441    self.mkdir(path, parents=True, exist_ok=True)
442
443  def cat(self, file: pth.AnyPathLike, encoding: str = "utf-8") -> str:
444    """Meow! I return the file contents as a str."""
445    with self.local_path(file).open(encoding=encoding) as f:
446      return f.read()
447
448  def cat_bytes(self, file: pth.AnyPathLike) -> bytes:
449    """Hiss! I return the file contents as bytes."""
450    with self.local_path(file).open("rb") as f:
451      return f.read()
452
453  def get_file_contents(self,
454                        file: pth.AnyPathLike,
455                        encoding: str = "utf-8") -> str:
456    return self.cat(file, encoding)
457
458  def set_file_contents(self,
459                        file: pth.AnyPathLike,
460                        data: str,
461                        encoding: str = "utf-8") -> None:
462    with self.local_path(file).open("w", encoding=encoding) as f:
463      f.write(data)
464
465  def pull(self, from_path: pth.AnyPath,
466           to_path: pth.LocalPath) -> pth.LocalPath:
467    """ Download / Copy a (remote) file to the local filesystem.
468    By default this is just a copy operation on the local filesystem.
469    """
470    self.assert_is_local()
471    return self.local_path(self.copy_file(from_path, to_path))
472
473  def push(self, from_path: pth.LocalPath, to_path: pth.AnyPath) -> pth.AnyPath:
474    """ Copy a local file to this (remote) platform.
475    By default this is just a copy operation on the local filesystem.
476    """
477    self.assert_is_local()
478    return self.copy_file(from_path, to_path)
479
480  def copy(self, from_path: pth.AnyPath, to_path: pth.AnyPath) -> pth.AnyPath:
481    """ Convenience implementation for copying local files and dirs """
482    if not self.exists(from_path):
483      raise ValueError(f"Cannot copy non-existing source path: {from_path}")
484    if self.is_dir(from_path):
485      return self.copy_dir(from_path, to_path)
486    return self.copy_file(from_path, to_path)
487
488  def copy_dir(self, from_path: pth.AnyPathLike,
489               to_path: pth.AnyPathLike) -> pth.AnyPath:
490    from_path = self.local_path(from_path)
491    to_path = self.local_path(to_path)
492    self.mkdir(to_path.parent, parents=True, exist_ok=True)
493    shutil.copytree(os.fspath(from_path), os.fspath(to_path))
494    return to_path
495
496  def copy_file(self, from_path: pth.AnyPathLike,
497                to_path: pth.AnyPathLike) -> pth.AnyPath:
498    from_path = self.local_path(from_path)
499    to_path = self.local_path(to_path)
500    self.mkdir(to_path.parent, parents=True, exist_ok=True)
501    shutil.copy2(os.fspath(from_path), os.fspath(to_path))
502    return to_path
503
504  def rm(self,
505         path: pth.AnyPathLike,
506         dir: bool = False,
507         missing_ok: bool = False) -> None:
508    """Remove a single file on this platform."""
509    path = self.local_path(path)
510    if dir:
511      if missing_ok and not self.exists(path):
512        return
513      shutil.rmtree(os.fspath(path))
514    else:
515      path.unlink(missing_ok)
516
517  def rename(self, src_path: pth.AnyPathLike,
518             dst_path: pth.AnyPathLike) -> pth.AnyPath:
519    """Remove a single file on this platform."""
520    return self.local_path(src_path).rename(dst_path)
521
522  def symlink_or_copy(self, src: pth.AnyPathLike,
523                      dst: pth.AnyPathLike) -> pth.AnyPath:
524    """Windows does not support symlinking without admin support.
525    Copy files on windows (see WinPlatform) but symlink everywhere else."""
526    assert not self.is_win, "Unsupported operation 'symlink_or_copy' on windows"
527    dst_path = self.local_path(dst)
528    dst_path.symlink_to(self.path(src))
529    return dst_path
530
531  def path(self, path: pth.AnyPathLike) -> pth.AnyPath:
532    """"Used to convert any paths and strings to a platform specific
533    remote path.
534    For instance a remote ADB platform on windows returns posix paths:
535      posix_path = adb_remote_platform.patch(windows_path)
536    This is used when passing out platform specific paths to remote shell
537    commands.
538    """
539    return self.local_path(path)
540
541  def local_path(self, path: pth.AnyPathLike) -> pth.LocalPath:
542    self.assert_is_local()
543    return pth.LocalPath(path)
544
545  def absolute(self, path: pth.AnyPathLike) -> pth.AnyPath:
546    """Convert an arbitrary path to a platform-specific absolute path"""
547    platform_path: pth.AnyPath = self.path(path)
548    if platform_path.is_absolute():
549      return platform_path
550    if self.is_local:
551      return self.local_path(platform_path).absolute()
552    raise RuntimeError(
553        f"Converting relative to absolute paths is not supported on {self}")
554
555  def home(self) -> pth.AnyPath:
556    return pathlib.Path.home()
557
558  def touch(self, path: pth.AnyPathLike) -> None:
559    self.local_path(path).touch(exist_ok=True)
560
561  def mkdir(self,
562            path: pth.AnyPathLike,
563            parents: bool = True,
564            exist_ok: bool = True) -> None:
565    self.local_path(path).mkdir(parents=parents, exist_ok=exist_ok)
566
567  @contextlib.contextmanager
568  def NamedTemporaryFile(  # pylint: disable=invalid-name
569      self,
570      prefix: Optional[str] = None,
571      dir: Optional[pth.AnyPathLike] = None):
572    tmp_file: LocalPath = self.host_platform.local_path(
573        self.host_platform.mktemp(prefix, dir))
574    try:
575      yield tmp_file
576    finally:
577      self.rm(tmp_file, missing_ok=True)
578
579  def mkdtemp(self,
580              prefix: Optional[str] = None,
581              dir: Optional[pth.AnyPathLike] = None) -> pth.AnyPath:
582    self.assert_is_local()
583    return self.path(tempfile.mkdtemp(prefix=prefix, dir=dir))
584
585  def mktemp(self,
586             prefix: Optional[str] = None,
587             dir: Optional[pth.AnyPathLike] = None) -> pth.AnyPath:
588    self.assert_is_local()
589    fd, name = tempfile.mkstemp(prefix=prefix, dir=dir)
590    os.close(fd)
591    return self.path(name)
592
593  @contextlib.contextmanager
594  def TemporaryDirectory(  # pylint: disable=invalid-name
595      self,
596      prefix: Optional[str] = None,
597      dir: Optional[pth.AnyPathLike] = None):
598    tmp_dir = self.mkdtemp(prefix, dir)
599    try:
600      yield tmp_dir
601    finally:
602      self.rm(tmp_dir, dir=True, missing_ok=True)
603
604  def exists(self, path: pth.AnyPathLike) -> bool:
605    return self.local_path(path).exists()
606
607  def is_file(self, path: pth.AnyPathLike) -> bool:
608    return self.local_path(path).is_file()
609
610  def is_dir(self, path: pth.AnyPathLike) -> bool:
611    return self.local_path(path).is_dir()
612
613  def iterdir(self,
614              path: pth.AnyPathLike) -> Generator[pth.AnyPath, None, None]:
615    return self.local_path(path).iterdir()
616
617  def glob(self, path: pth.AnyPathLike,
618           pattern: str) -> Generator[pth.AnyPath, None, None]:
619    # TODO: support remotely
620    return self.local_path(path).glob(pattern)
621
622  def file_size(self, path: pth.AnyPathLike) -> int:
623    # TODO: support remotely
624    return self.local_path(path).stat().st_size
625
626  def sh_stdout(self,
627                *args: CmdArg,
628                shell: bool = False,
629                quiet: bool = False,
630                encoding: str = "utf-8",
631                stdin=None,
632                env: Optional[Mapping[str, str]] = None,
633                check: bool = True) -> str:
634    result = self.sh_stdout_bytes(
635        *args, shell=shell, quiet=quiet, stdin=stdin, env=env, check=check)
636    return result.decode(encoding)
637
638  def sh_stdout_bytes(self,
639                      *args: CmdArg,
640                      shell: bool = False,
641                      quiet: bool = False,
642                      stdin=None,
643                      env: Optional[Mapping[str, str]] = None,
644                      check: bool = True) -> bytes:
645    completed_process = self.sh(
646        *args,
647        shell=shell,
648        capture_output=True,
649        quiet=quiet,
650        stdin=stdin,
651        env=env,
652        check=check)
653    return completed_process.stdout
654
655  def popen(self,
656            *args: CmdArg,
657            bufsize=-1,
658            shell: bool = False,
659            stdout=None,
660            stderr=None,
661            stdin=None,
662            env: Optional[Mapping[str, str]] = None,
663            quiet: bool = False) -> subprocess.Popen:
664    self.assert_is_local()
665    if not quiet:
666      logging.debug("SHELL: %s", shlex.join(map(str, args)))
667      logging.debug("CWD: %s", os.getcwd())
668    return subprocess.Popen(
669        args=args,
670        bufsize=bufsize,
671        shell=shell,
672        stdin=stdin,
673        stderr=stderr,
674        stdout=stdout,
675        env=env)
676
677  def sh(self,
678         *args: CmdArg,
679         shell: bool = False,
680         capture_output: bool = False,
681         stdout=None,
682         stderr=None,
683         stdin=None,
684         env: Optional[Mapping[str, str]] = None,
685         quiet: bool = False,
686         check: bool = True) -> subprocess.CompletedProcess:
687    self.assert_is_local()
688    if not quiet:
689      logging.debug("SHELL: %s", shlex.join(map(str, args)))
690      logging.debug("CWD: %s", os.getcwd())
691    process = subprocess.run(
692        args=args,
693        shell=shell,
694        stdin=stdin,
695        stdout=stdout,
696        stderr=stderr,
697        env=env,
698        capture_output=capture_output,
699        check=False)
700    if check and process.returncode != 0:
701      raise SubprocessError(self, process)
702    return process
703
704  def exec_apple_script(self, script: str, *args: str) -> str:
705    del script, args
706    raise NotImplementedError("AppleScript is only available on MacOS")
707
708  def log(self, *messages: Any, level: int = 2) -> None:
709    message_str = " ".join(map(str, messages))
710    if level == 3:
711      level = logging.DEBUG
712    if level == 2:
713      level = logging.INFO
714    if level == 1:
715      level = logging.WARNING
716    if level == 0:
717      level = logging.ERROR
718    logging.log(level, message_str)
719
720  # TODO(cbruni): split into separate list_system_monitoring and
721  # disable_system_monitoring methods
722  def check_system_monitoring(self, disable: bool = False) -> bool:
723    # pylint: disable=unused-argument
724    return True
725
726  def get_relative_cpu_speed(self) -> float:
727    return 1
728
729  def is_thermal_throttled(self) -> bool:
730    return self.get_relative_cpu_speed() < 1
731
732  def disk_usage(self, path: pth.AnyPathLike) -> psutil._common.sdiskusage:
733    return psutil.disk_usage(str(self.local_path(path)))
734
735  def cpu_usage(self) -> float:
736    self.assert_is_local()
737    return 1 - psutil.cpu_times_percent().idle / 100
738
739  def cpu_details(self) -> Dict[str, Any]:
740    self.assert_is_local()
741    details = {
742        "physical cores":
743            psutil.cpu_count(logical=False),
744        "logical cores":
745            psutil.cpu_count(logical=True),
746        "usage":
747            psutil.cpu_percent(  # pytype: disable=attribute-error
748                percpu=True, interval=0.1),
749        "total usage":
750            psutil.cpu_percent(),
751        "system load":
752            psutil.getloadavg(),
753        "info":
754            self.cpu,
755    }
756    try:
757      cpu_freq = psutil.cpu_freq()
758    except FileNotFoundError as e:
759      logging.debug("psutil.cpu_freq() failed (normal on macOS M1): %s", e)
760      return details
761    details.update({
762        "max frequency": f"{cpu_freq.max:.2f}Mhz",
763        "min frequency": f"{cpu_freq.min:.2f}Mhz",
764        "current frequency": f"{cpu_freq.current:.2f}Mhz",
765    })
766    return details
767
768  def system_details(self) -> Dict[str, Any]:
769    return {
770        "machine": str(self.machine),
771        "os": self.os_details(),
772        "python": self.python_details(),
773        "CPU": self.cpu_details(),
774    }
775
776  def os_details(self) -> JsonDict:
777    self.assert_is_local()
778    return {
779        "system": py_platform.system(),
780        "release": py_platform.release(),
781        "version": py_platform.version(),
782        "platform": py_platform.platform(),
783    }
784
785  def python_details(self) -> JsonDict:
786    self.assert_is_local()
787    return {
788        "version": py_platform.python_version(),
789        "bits": 64 if sys.maxsize > 2**32 else 32,
790    }
791
792  def download_to(self, url: str, path: pth.LocalPath) -> pth.LocalPath:
793    self.assert_is_local()
794    logging.debug("DOWNLOAD: %s\n       TO: %s", url, path)
795    assert not path.exists(), f"Download destination {path} exists already."
796    try:
797      urllib.request.urlretrieve(url, path)
798    except (urllib.error.HTTPError, urllib.error.URLError) as e:
799      raise OSError(f"Could not load {url}") from e
800    assert path.exists(), (
801        f"Downloading {url} failed. Downloaded file {path} doesn't exist.")
802    return path
803
804  def concat_files(self,
805                   inputs: Iterable[pth.LocalPath],
806                   output: pth.LocalPath,
807                   prefix: str = "") -> pth.LocalPath:
808    self.assert_is_local()
809    with output.open("w", encoding="utf-8") as output_f:
810      if prefix:
811        output_f.write(prefix)
812      for input_file in inputs:
813        assert input_file.is_file()
814        with input_file.open(encoding="utf-8") as input_f:
815          shutil.copyfileobj(input_f, output_f)
816    return output
817
818  def set_main_display_brightness(self, brightness_level: int) -> None:
819    raise NotImplementedError(
820        "'set_main_display_brightness' is only available on MacOS for now")
821
822  def get_main_display_brightness(self) -> int:
823    raise NotImplementedError(
824        "'get_main_display_brightness' is only available on MacOS for now")
825
826  def check_autobrightness(self) -> bool:
827    raise NotImplementedError(
828        "'check_autobrightness' is only available on MacOS for now")
829
830  def screenshot(self, result_path: pth.AnyPath) -> None:
831    # TODO: support screen coordinates
832    raise NotImplementedError(
833        "'screenshot' is only available on MacOS for now")
834