• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import annotations
6
7import abc
8import functools
9import logging
10import re
11from typing import TYPE_CHECKING, Any, Dict, Generator, Iterator, Optional
12
13from crossbench import path as pth
14from crossbench.plt.base import Environ, ListCmdArgs, Platform, SubprocessError
15from crossbench.plt.remote import RemotePlatformMixin
16
17if TYPE_CHECKING:
18  from crossbench.types import JsonDict
19
20
21class PosixPlatform(Platform, metaclass=abc.ABCMeta):
22  # pylint: disable=locally-disabled, redefined-builtin
23
24  def __init__(self) -> None:
25    super().__init__()
26    self._default_tmp_dir: pth.AnyPath = pth.AnyPosixPath("")
27
28  @functools.cached_property
29  def version(self) -> str:  #pylint: disable=invalid-overridden-method
30    return self.sh_stdout("uname", "-r").strip()
31
32  def _raw_machine_arch(self):
33    if self.is_local:
34      return super()._raw_machine_arch()
35    return self.sh_stdout("uname", "-m").strip()
36
37  def _get_cpu_cores_info(self) -> str:
38    try:
39      max_cores_file = self.path("/sys/devices/system/cpu/possible")
40      _, max_core = self.cat(max_cores_file).strip().split("-", maxsplit=1)
41      cores = int(max_core) + 1
42      return f"{cores} cores"
43    except Exception as e:  # pylint: disable=broad-except
44      logging.debug("Failed to get detailed CPU stats: %s", e)
45      return ""
46
47  _GET_CPONF_PROC_RE: re.Pattern = re.compile(
48      r".*PROCESSORS_CONF[^0-9]+(?P<cores>[0-9]+)")
49
50  def cpu_details(self) -> Dict[str, Any]:
51    if self.is_local:
52      return super().cpu_details()
53    cores = -1
54    if self.which("nproc"):
55      cores = int(self.sh_stdout("nproc"))
56    elif self.which("getconf"):
57      result = self._GET_CPONF_PROC_RE.search(self.sh_stdout("getconf", "-a"))
58      if result:
59        cores = int(result["cores"])
60    return {
61        "physical cores": cores,
62        "info": self.cpu,
63    }
64
65  def os_details(self) -> JsonDict:
66    if self.is_local:
67      return super().os_details()
68    return {
69        "system": self.sh_stdout("uname").strip(),
70        "release": self.sh_stdout("uname", "-r").strip(),
71        "version": self.sh_stdout("uname", "-v").strip(),
72        "platform": self.sh_stdout("uname", "-a").strip(),
73    }
74
75  _PY_VERSION: str = "import sys; print(64 if sys.maxsize > 2**32 else 32)"
76
77  def python_details(self) -> JsonDict:
78    if self.is_local:
79      return super().python_details()
80    if not self.which("python3"):
81      return {"version": "unknown", "bits": 64}
82    return {
83        "version": self.sh_stdout("python3", "--version").strip(),
84        "bits": int(self.sh_stdout("python3", "-c", self._PY_VERSION).strip())
85    }
86
87  def app_version(self, app_or_bin: pth.AnyPathLike) -> str:
88    app_or_bin = self.path(app_or_bin)
89    if not self.exists(app_or_bin):
90      raise ValueError(f"Binary {app_or_bin} does not exist.")
91    return self.sh_stdout(app_or_bin, "--version")
92
93  @property
94  def default_tmp_dir(self) -> pth.AnyPath:
95    if self._default_tmp_dir.parts:
96      return self._default_tmp_dir
97    if self.is_local:
98      self._default_tmp_dir = self.path(super().default_tmp_dir)
99      return self._default_tmp_dir
100    env = self.environ
101
102    for tmp_var in ("TMPDIR", "TEMP", "TMP"):
103      if tmp_var not in env:
104        continue
105      tmp_path = self.path(env[tmp_var])
106      if self.is_dir(tmp_path):
107        self._default_tmp_dir = tmp_path
108        return tmp_path
109    self._default_tmp_dir = self.path("/tmp")
110    assert self.is_dir(self._default_tmp_dir), (
111        f"Fallback tmp dir does not exist: {self._default_tmp_dir}")
112    return self._default_tmp_dir
113
114  def path(self, path: pth.AnyPathLike) -> pth.AnyPath:
115    if self.is_local:
116      return pth.LocalPosixPath(path)
117    return pth.AnyPosixPath(path)
118
119  def which(self, binary_name: pth.AnyPathLike) -> Optional[pth.AnyPath]:
120    if self.is_local:
121      return super().which(binary_name)
122    if not binary_name:
123      raise ValueError("Got empty path")
124    if override := self.lookup_binary_override(binary_name):
125      return override
126    try:
127      if maybe_path := self.sh_stdout("which", self.path(binary_name)).strip():
128        maybe_bin = self.path(maybe_path)
129        if self.exists(maybe_bin):
130          return maybe_bin
131    except SubprocessError:
132      pass
133    return None
134
135  def cat(self, file: pth.AnyPathLike, encoding: str = "utf-8") -> str:
136    if self.is_local:
137      return super().cat(file, encoding)
138    return self.sh_stdout("cat", self.path(file), encoding=encoding)
139
140  def cat_bytes(self, file: pth.AnyPathLike) -> bytes:
141    if self.is_local:
142      return super().cat_bytes(file)
143    return self.sh_stdout_bytes("cat", self.path(file))
144
145  def rm(self,
146         path: pth.AnyPathLike,
147         dir: bool = False,
148         missing_ok: bool = False) -> None:
149    if self.is_local:
150      super().rm(path, dir, missing_ok)
151      return
152    if missing_ok and not self.exists(path):
153      return
154    if dir:
155      self.sh("rm", "-rf", self.path(path))
156    else:
157      self.sh("rm", self.path(path))
158
159  def rename(self, src_path: pth.AnyPathLike,
160             dst_path: pth.AnyPathLike) -> pth.AnyPath:
161    if self.is_local:
162      return super().rename(src_path, dst_path)
163    dst_path = self.path(dst_path)
164    self.sh("mv", self.path(src_path), dst_path)
165    return dst_path
166
167  def home(self) -> pth.AnyPath:
168    if self.is_local:
169      return super().home()
170    return self.path(self.sh_stdout("printenv", "HOME").strip())
171
172  def touch(self, path: pth.AnyPathLike) -> None:
173    if self.is_local:
174      super().touch(path)
175    else:
176      self.sh("touch", self.path(path))
177
178  def mkdir(self,
179            path: pth.AnyPathLike,
180            parents: bool = True,
181            exist_ok: bool = True) -> None:
182    if self.is_local:
183      super().mkdir(path, parents, exist_ok)
184    elif parents or exist_ok:
185      self.sh("mkdir", "-p", self.path(path))
186    else:
187      self.sh("mkdir", "-p", self.path(path))
188
189  def mkdtemp(self,
190              prefix: Optional[str] = None,
191              dir: Optional[pth.AnyPathLike] = None) -> pth.AnyPath:
192    if self.is_local:
193      return super().mkdtemp(prefix, dir)
194    return self._mktemp_sh(is_dir=True, prefix=prefix, dir=dir)
195
196  def mktemp(self,
197             prefix: Optional[str] = None,
198             dir: Optional[pth.AnyPathLike] = None) -> pth.AnyPath:
199    if self.is_local:
200      return super().mktemp(prefix, dir)
201    return self._mktemp_sh(is_dir=False, prefix=prefix, dir=dir)
202
203  def _mktemp_sh(self, is_dir: bool, prefix: Optional[str],
204                 dir: Optional[pth.AnyPathLike]) -> pth.AnyPath:
205    if not dir:
206      dir = self.default_tmp_dir
207    template = self.path(dir) / f"{prefix}.XXXXXXXXXXX"
208    args: ListCmdArgs = ["mktemp"]
209    if is_dir:
210      args.append("-d")
211    args.append(str(template))
212    result = self.sh_stdout(*args)
213    return self.path(result.strip())
214
215  def copy_dir(self, from_path: pth.AnyPathLike,
216               to_path: pth.AnyPathLike) -> pth.AnyPath:
217    if self.is_local:
218      return super().copy_dir(from_path, to_path)
219    from_path = self.path(from_path)
220    to_path = self.path(to_path)
221    if not self.exists(from_path):
222      raise ValueError(f"Cannot copy non-existing source path: {from_path}")
223    self.mkdir(to_path.parent, parents=True, exist_ok=True)
224    self.sh("cp", "-R", from_path, to_path)
225    return to_path
226
227  def copy_file(self, from_path: pth.AnyPathLike,
228                to_path: pth.AnyPathLike) -> pth.AnyPath:
229    if self.is_local:
230      return super().copy_file(from_path, to_path)
231    from_path = self.path(from_path)
232    to_path = self.path(to_path)
233    if not self.exists(from_path):
234      raise ValueError(f"Cannot copy non-existing source path: {from_path}")
235    self.mkdir(to_path.parent, parents=True, exist_ok=True)
236    self.sh("cp", from_path, to_path)
237    return to_path
238
239  def set_file_contents(self,
240                        file: pth.AnyPathLike,
241                        data: str,
242                        encoding: str = "utf-8") -> None:
243    if self.is_local:
244      super().set_file_contents(file, data, encoding)
245      return
246    # TODO: implement stdin bypass for small content
247    dest_file = self.path(file)
248    with self.host_platform.NamedTemporaryFile("push.data") as tmp_file:
249      self.host_platform.set_file_contents(tmp_file, data, encoding=encoding)
250      self.push(tmp_file, dest_file)
251
252  def exists(self, path: pth.AnyPathLike) -> bool:
253    if self.is_local:
254      return super().exists(path)
255    return self.sh("[", "-e", self.path(path), "]", check=False).returncode == 0
256
257  def is_file(self, path: pth.AnyPathLike) -> bool:
258    if self.is_local:
259      return super().is_file(path)
260    return self.sh("[", "-f", self.path(path), "]", check=False).returncode == 0
261
262  def is_dir(self, path: pth.AnyPathLike) -> bool:
263    if self.is_local:
264      return super().is_dir(path)
265    return self.sh("[", "-d", self.path(path), "]", check=False).returncode == 0
266
267  def iterdir(self,
268              path: pth.AnyPathLike) -> Generator[pth.AnyPath, None, None]:
269    if self.is_local:
270      yield from super().iterdir(path)
271      return
272
273    remote_path = self.path(path)
274    if not self.is_dir(remote_path):
275      raise NotADirectoryError(f"Not a directory: {remote_path}")
276
277    for name in self.sh_stdout("ls", "-1",
278                               remote_path).rstrip("\n").split("\n"):
279      yield remote_path / name
280
281  def terminate(self, proc_pid: int) -> None:
282    self.sh("kill", "-s", "TERM", str(proc_pid))
283
284  def process_info(self, pid: int) -> Optional[Dict[str, Any]]:
285    if self.is_local:
286      return super().process_info(pid)
287    try:
288      lines = self.sh_stdout("ps", "-o", "comm", "-p", str(pid)).splitlines()
289      if len(lines) <= 1:
290        return None
291      assert len(lines) == 2, lines
292      tokens = lines[1].split()
293      assert len(tokens) == 1
294      return {"comm": tokens[0]}
295    except SubprocessError:
296      return None
297
298  @property
299  def environ(self) -> Environ:
300    if self.is_local:
301      return super().environ
302    return RemotePosixEnviron(self)
303
304
305class RemotePosixEnviron(Environ):
306
307  def __init__(self, platform: PosixPlatform) -> None:
308    self._platform = platform
309    self._environ = {}
310    for line in self._platform.sh_stdout("env").splitlines():
311      parts = line.split("=", maxsplit=1)
312      if len(parts) == 2:
313        key, value = parts
314        self._environ[key] = value
315      else:
316        assert len(parts) == 1
317        key = parts[0]
318        self._environ[key] = ""
319
320  def __getitem__(self, key: str) -> str:
321    return self._environ.__getitem__(key)
322
323  def __setitem__(self, key: str, item: str) -> None:
324    raise NotImplementedError("Unsupported")
325
326  def __delitem__(self, key: str) -> None:
327    raise NotImplementedError("Unsupported")
328
329  def __iter__(self) -> Iterator[str]:
330    return self._environ.__iter__()
331
332  def __len__(self) -> int:
333    return self._environ.__len__()
334
335
336class RemotePosixPlatform(RemotePlatformMixin, PosixPlatform):
337  pass
338