• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import annotations
6
7import collections
8import datetime as dt
9import pathlib
10import shlex
11from subprocess import CompletedProcess
12from typing import (TYPE_CHECKING, Any, Dict, Iterable, List, Mapping,
13                    Optional, Sequence, Union)
14
15import psutil
16
17from crossbench import path as pth
18from crossbench import plt
19from crossbench.benchmarks.base import SubStoryBenchmark
20from crossbench.cli.cli import CrossBenchCLI
21from crossbench.plt.android_adb import Adb, AndroidAdbPlatform
22from crossbench.plt.base import MachineArch, Platform
23from crossbench.plt.chromeos_ssh import ChromeOsSshPlatform
24from crossbench.plt.linux import LinuxPlatform
25from crossbench.plt.linux_ssh import LinuxSshPlatform
26from crossbench.plt.macos import MacOSPlatform
27from crossbench.plt.win import WinPlatform
28from crossbench.runner.run import Run
29from crossbench.stories.story import Story
30
31if TYPE_CHECKING:
32  from crossbench.plt.base import ListCmdArgs, TupleCmdArgs
33  from crossbench.runner.runner import Runner
34
35
36GIB = 1014**3
37
38
39
40class MockPlatformMixin:
41
42  def __init__(self, *args, is_battery_powered=False, **kwargs):
43    self._is_battery_powered = is_battery_powered
44    # Cache some helper properties that might fail under pyfakefs.
45    self._sh_cmds: List[TupleCmdArgs] = []
46    self._expected_sh_cmds: Optional[List[TupleCmdArgs]] = None
47    self._sh_results: List[bytes] = []
48    self.file_contents: Dict[pth.AnyPath, List[str]] = (
49        collections.defaultdict(list))
50    self.sleeps: List[dt.timedelta] = []
51    super().__init__(*args, **kwargs)
52
53  def expect_sh(self,
54                *args: Union[str, pathlib.Path],
55                result: Union[str, bytes] = "") -> None:
56    if args:
57      if self._expected_sh_cmds is None:
58        self._expected_sh_cmds = []
59      self._expected_sh_cmds.append(self._convert_sh_args(*args))
60    if isinstance(result, str):
61      result = result.encode("utf-8")
62    assert isinstance(result, bytes)
63    self._sh_results.append(result)
64
65  def _convert_sh_args(self, *args: Union[str, pathlib.Path]) -> TupleCmdArgs:
66    converted_args : ListCmdArgs = []
67    for arg in args:
68      if not isinstance(arg, (str, pathlib.PurePath)):
69        arg = str(arg)
70      converted_args.append(arg)
71    return tuple(converted_args)
72
73  @property
74  def sh_results(self) -> List[bytes]:
75    return list(self._sh_results)
76
77  @sh_results.setter
78  def sh_results(self, results: Iterable[Union[str, bytes]]) -> None:
79    assert not self._sh_results, "Trying to override non-consumed results"
80    assert not self._expected_sh_cmds, (
81        "expect_sh() cannot be used together with sh_results")
82    for result in results:
83      self.expect_sh(result=result)
84
85  @property
86  def sh_cmds(self) -> List[TupleCmdArgs]:
87    return list(self._sh_cmds)
88
89  @property
90  def expected_sh_cmds(self) -> Optional[List[TupleCmdArgs]]:
91    if self._expected_sh_cmds is None:
92      return None
93    return list(self._expected_sh_cmds)
94
95  @property
96  def name(self) -> str:
97    return f"mock.{super().name}"
98
99  @property
100  def machine(self) -> MachineArch:
101    return MachineArch.ARM_64
102
103  @property
104  def version(self) -> str:
105    return "1.2.3.4.5"
106
107  @property
108  def device(self) -> str:
109    return "TestBook Pro"
110
111  @property
112  def cpu(self) -> str:
113    return "Mega CPU @ 3.00GHz"
114
115  @property
116  def is_battery_powered(self) -> bool:
117    return self._is_battery_powered
118
119  def is_thermal_throttled(self) -> bool:
120    return False
121
122  def disk_usage(self, path: pathlib.Path):
123    del path
124    # pylint: disable=protected-access
125    return psutil._common.sdiskusage(
126        total=GIB * 100, used=20 * GIB, free=80 * GIB, percent=20)
127
128  def cpu_usage(self) -> float:
129    return 0.1
130
131  def cpu_details(self) -> Dict[str, Any]:
132    return {"physical cores": 2, "logical cores": 4, "info": self.cpu}
133
134  def set_file_contents(self,
135                        file: pth.AnyPathLike,
136                        data: str,
137                        encoding: str = "utf-8") -> None:
138    del encoding
139    file_path = self.path(file)
140    self.file_contents[file_path].append(data)
141
142  def system_details(self):
143    return {"CPU": "20-core 3.1 GHz"}
144
145  def sleep(self, duration):
146    self.sleeps.append(duration)
147
148  def processes(self, attrs=()):
149    del attrs
150    return []
151
152  def process_children(self, parent_pid: int, recursive=False):
153    del parent_pid, recursive
154    return []
155
156  def foreground_process(self):
157    return None
158
159  def search_platform_binary(
160      self,
161      name: str,
162      macos: Sequence[str] = (),
163      win: Sequence[str] = (),
164      linux: Sequence[str] = ()
165  ) -> pth.AnyPath:
166    del macos, win, linux
167    return self.path(f"/usr/bin/{name}")
168
169  def sh_stdout_bytes(self,
170                      *args: Union[str, pathlib.Path],
171                      shell: bool = False,
172                      quiet: bool = False,
173                      stdin=None,
174                      env: Optional[Mapping[str, str]] = None,
175                      check: bool = True) -> bytes:
176    del shell, quiet, stdin, env, check
177    if self._expected_sh_cmds is not None:
178      assert self._expected_sh_cmds, (
179          f"Missing expected sh_cmds, but got: {args}")
180      # Convert all args to str first, sh accepts both str and Paths.
181      expected = tuple(map(str, self._expected_sh_cmds[0]))
182      str_args = tuple(map(str, args))
183      assert expected == str_args, (f"After {len(self._sh_cmds)} cmds: \n"
184                                    f"  expected: {expected}\n"
185                                    f"  got:      {str_args}")
186      self._expected_sh_cmds.pop(0)
187    self._sh_cmds.append(args)
188    if not self._sh_results:
189      cmd = shlex.join(map(str, args))
190      raise ValueError(f"After {len(self._sh_cmds)} cmds: "
191                       f"MockPlatform has no more sh outputs for cmd: {cmd}")
192    return self._sh_results.pop(0)
193
194  def sh(self,
195         *args: Union[str, pathlib.Path],
196         shell: bool = False,
197         capture_output: bool = False,
198         stdout=None,
199         stderr=None,
200         stdin=None,
201         env: Optional[Mapping[str, str]] = None,
202         quiet: bool = False,
203         check: bool = False):
204    del capture_output, stderr, stdin, stdout
205    self.sh_stdout(*args, shell=shell, quiet=quiet, env=env, check=check)
206    # TODO: Generalize this in the future, to mimic failing `sh` calls.
207    return CompletedProcess(args, 0)
208
209
210class PosixMockPlatformMixin(MockPlatformMixin):
211  pass
212
213
214class WinMockPlatformMixin(MockPlatformMixin):
215  # TODO: use wrapper fake path to get windows-path formatting by default
216  # when running on posix.
217
218  def path(self, path: pth.AnyPathLike) -> pth.AnyPath:
219    return pathlib.PureWindowsPath(path)
220
221
222class LinuxMockPlatform(PosixMockPlatformMixin, LinuxPlatform):
223  pass
224
225
226class LinuxSshMockPlatform(PosixMockPlatformMixin, LinuxSshPlatform):
227  pass
228
229
230class ChromeOsSshMockPlatform(PosixMockPlatformMixin, ChromeOsSshPlatform):
231  pass
232
233
234class MacOsMockPlatform(PosixMockPlatformMixin, MacOSPlatform):
235  pass
236
237
238class WinMockPlatform(WinMockPlatformMixin, WinPlatform):
239  pass
240
241
242class MockAdb(Adb):
243
244  def start_server(self) -> None:
245    pass
246
247  def stop_server(self) -> None:
248    pass
249
250  def kill_server(self) -> None:
251    pass
252
253
254class AndroidAdbMockPlatform(MockPlatformMixin, AndroidAdbPlatform):
255  pass
256
257
258class GenericMockPlatform(MockPlatformMixin, Platform):
259  pass
260
261
262if plt.PLATFORM.is_linux:
263  MockPlatform = LinuxMockPlatform
264elif plt.PLATFORM.is_macos:
265  MockPlatform = MacOsMockPlatform
266elif plt.PLATFORM.is_win:
267  MockPlatform = WinMockPlatform
268else:
269  raise RuntimeError(f"Unsupported platform: {plt.PLATFORM}")
270
271
272class MockStory(Story):
273
274  @classmethod
275  def all_story_names(cls):
276    return ["story_1", "story_2"]
277
278  def run(self, run: Run) -> None:
279    pass
280
281
282class MockBenchmark(SubStoryBenchmark):
283  NAME = "mock-benchmark"
284  DEFAULT_STORY_CLS = MockStory
285
286
287class MockCLI(CrossBenchCLI):
288  runner: Runner
289  platform: Platform
290
291  def __init__(self, *args, **kwargs) -> None:
292    self.platform = kwargs.pop("platform")
293    super().__init__(*args, **kwargs)
294
295  def _get_runner(self, args, benchmark, env_config, env_validation_mode,
296                  timing):
297    if not args.out_dir:
298      # Use stable mock out dir
299      args.out_dir = pathlib.Path("/results")
300      assert not args.out_dir.exists()
301    runner_kwargs = self.RUNNER_CLS.kwargs_from_cli(args)
302    self.runner = self.RUNNER_CLS(
303        benchmark=benchmark,
304        env_config=env_config,
305        env_validation_mode=env_validation_mode,
306        timing=timing,
307        **runner_kwargs,
308        # Use custom platform
309        platform=self.platform)
310    return self.runner
311