1# Copyright 2022 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import annotations 6 7import collections 8import datetime as dt 9import pathlib 10import shlex 11from subprocess import CompletedProcess 12from typing import (TYPE_CHECKING, Any, Dict, Iterable, List, Mapping, 13 Optional, Sequence, Union) 14 15import psutil 16 17from crossbench import path as pth 18from crossbench import plt 19from crossbench.benchmarks.base import SubStoryBenchmark 20from crossbench.cli.cli import CrossBenchCLI 21from crossbench.plt.android_adb import Adb, AndroidAdbPlatform 22from crossbench.plt.base import MachineArch, Platform 23from crossbench.plt.chromeos_ssh import ChromeOsSshPlatform 24from crossbench.plt.linux import LinuxPlatform 25from crossbench.plt.linux_ssh import LinuxSshPlatform 26from crossbench.plt.macos import MacOSPlatform 27from crossbench.plt.win import WinPlatform 28from crossbench.runner.run import Run 29from crossbench.stories.story import Story 30 31if TYPE_CHECKING: 32 from crossbench.plt.base import ListCmdArgs, TupleCmdArgs 33 from crossbench.runner.runner import Runner 34 35 36GIB = 1014**3 37 38 39 40class MockPlatformMixin: 41 42 def __init__(self, *args, is_battery_powered=False, **kwargs): 43 self._is_battery_powered = is_battery_powered 44 # Cache some helper properties that might fail under pyfakefs. 45 self._sh_cmds: List[TupleCmdArgs] = [] 46 self._expected_sh_cmds: Optional[List[TupleCmdArgs]] = None 47 self._sh_results: List[bytes] = [] 48 self.file_contents: Dict[pth.AnyPath, List[str]] = ( 49 collections.defaultdict(list)) 50 self.sleeps: List[dt.timedelta] = [] 51 super().__init__(*args, **kwargs) 52 53 def expect_sh(self, 54 *args: Union[str, pathlib.Path], 55 result: Union[str, bytes] = "") -> None: 56 if args: 57 if self._expected_sh_cmds is None: 58 self._expected_sh_cmds = [] 59 self._expected_sh_cmds.append(self._convert_sh_args(*args)) 60 if isinstance(result, str): 61 result = result.encode("utf-8") 62 assert isinstance(result, bytes) 63 self._sh_results.append(result) 64 65 def _convert_sh_args(self, *args: Union[str, pathlib.Path]) -> TupleCmdArgs: 66 converted_args : ListCmdArgs = [] 67 for arg in args: 68 if not isinstance(arg, (str, pathlib.PurePath)): 69 arg = str(arg) 70 converted_args.append(arg) 71 return tuple(converted_args) 72 73 @property 74 def sh_results(self) -> List[bytes]: 75 return list(self._sh_results) 76 77 @sh_results.setter 78 def sh_results(self, results: Iterable[Union[str, bytes]]) -> None: 79 assert not self._sh_results, "Trying to override non-consumed results" 80 assert not self._expected_sh_cmds, ( 81 "expect_sh() cannot be used together with sh_results") 82 for result in results: 83 self.expect_sh(result=result) 84 85 @property 86 def sh_cmds(self) -> List[TupleCmdArgs]: 87 return list(self._sh_cmds) 88 89 @property 90 def expected_sh_cmds(self) -> Optional[List[TupleCmdArgs]]: 91 if self._expected_sh_cmds is None: 92 return None 93 return list(self._expected_sh_cmds) 94 95 @property 96 def name(self) -> str: 97 return f"mock.{super().name}" 98 99 @property 100 def machine(self) -> MachineArch: 101 return MachineArch.ARM_64 102 103 @property 104 def version(self) -> str: 105 return "1.2.3.4.5" 106 107 @property 108 def device(self) -> str: 109 return "TestBook Pro" 110 111 @property 112 def cpu(self) -> str: 113 return "Mega CPU @ 3.00GHz" 114 115 @property 116 def is_battery_powered(self) -> bool: 117 return self._is_battery_powered 118 119 def is_thermal_throttled(self) -> bool: 120 return False 121 122 def disk_usage(self, path: pathlib.Path): 123 del path 124 # pylint: disable=protected-access 125 return psutil._common.sdiskusage( 126 total=GIB * 100, used=20 * GIB, free=80 * GIB, percent=20) 127 128 def cpu_usage(self) -> float: 129 return 0.1 130 131 def cpu_details(self) -> Dict[str, Any]: 132 return {"physical cores": 2, "logical cores": 4, "info": self.cpu} 133 134 def set_file_contents(self, 135 file: pth.AnyPathLike, 136 data: str, 137 encoding: str = "utf-8") -> None: 138 del encoding 139 file_path = self.path(file) 140 self.file_contents[file_path].append(data) 141 142 def system_details(self): 143 return {"CPU": "20-core 3.1 GHz"} 144 145 def sleep(self, duration): 146 self.sleeps.append(duration) 147 148 def processes(self, attrs=()): 149 del attrs 150 return [] 151 152 def process_children(self, parent_pid: int, recursive=False): 153 del parent_pid, recursive 154 return [] 155 156 def foreground_process(self): 157 return None 158 159 def search_platform_binary( 160 self, 161 name: str, 162 macos: Sequence[str] = (), 163 win: Sequence[str] = (), 164 linux: Sequence[str] = () 165 ) -> pth.AnyPath: 166 del macos, win, linux 167 return self.path(f"/usr/bin/{name}") 168 169 def sh_stdout_bytes(self, 170 *args: Union[str, pathlib.Path], 171 shell: bool = False, 172 quiet: bool = False, 173 stdin=None, 174 env: Optional[Mapping[str, str]] = None, 175 check: bool = True) -> bytes: 176 del shell, quiet, stdin, env, check 177 if self._expected_sh_cmds is not None: 178 assert self._expected_sh_cmds, ( 179 f"Missing expected sh_cmds, but got: {args}") 180 # Convert all args to str first, sh accepts both str and Paths. 181 expected = tuple(map(str, self._expected_sh_cmds[0])) 182 str_args = tuple(map(str, args)) 183 assert expected == str_args, (f"After {len(self._sh_cmds)} cmds: \n" 184 f" expected: {expected}\n" 185 f" got: {str_args}") 186 self._expected_sh_cmds.pop(0) 187 self._sh_cmds.append(args) 188 if not self._sh_results: 189 cmd = shlex.join(map(str, args)) 190 raise ValueError(f"After {len(self._sh_cmds)} cmds: " 191 f"MockPlatform has no more sh outputs for cmd: {cmd}") 192 return self._sh_results.pop(0) 193 194 def sh(self, 195 *args: Union[str, pathlib.Path], 196 shell: bool = False, 197 capture_output: bool = False, 198 stdout=None, 199 stderr=None, 200 stdin=None, 201 env: Optional[Mapping[str, str]] = None, 202 quiet: bool = False, 203 check: bool = False): 204 del capture_output, stderr, stdin, stdout 205 self.sh_stdout(*args, shell=shell, quiet=quiet, env=env, check=check) 206 # TODO: Generalize this in the future, to mimic failing `sh` calls. 207 return CompletedProcess(args, 0) 208 209 210class PosixMockPlatformMixin(MockPlatformMixin): 211 pass 212 213 214class WinMockPlatformMixin(MockPlatformMixin): 215 # TODO: use wrapper fake path to get windows-path formatting by default 216 # when running on posix. 217 218 def path(self, path: pth.AnyPathLike) -> pth.AnyPath: 219 return pathlib.PureWindowsPath(path) 220 221 222class LinuxMockPlatform(PosixMockPlatformMixin, LinuxPlatform): 223 pass 224 225 226class LinuxSshMockPlatform(PosixMockPlatformMixin, LinuxSshPlatform): 227 pass 228 229 230class ChromeOsSshMockPlatform(PosixMockPlatformMixin, ChromeOsSshPlatform): 231 pass 232 233 234class MacOsMockPlatform(PosixMockPlatformMixin, MacOSPlatform): 235 pass 236 237 238class WinMockPlatform(WinMockPlatformMixin, WinPlatform): 239 pass 240 241 242class MockAdb(Adb): 243 244 def start_server(self) -> None: 245 pass 246 247 def stop_server(self) -> None: 248 pass 249 250 def kill_server(self) -> None: 251 pass 252 253 254class AndroidAdbMockPlatform(MockPlatformMixin, AndroidAdbPlatform): 255 pass 256 257 258class GenericMockPlatform(MockPlatformMixin, Platform): 259 pass 260 261 262if plt.PLATFORM.is_linux: 263 MockPlatform = LinuxMockPlatform 264elif plt.PLATFORM.is_macos: 265 MockPlatform = MacOsMockPlatform 266elif plt.PLATFORM.is_win: 267 MockPlatform = WinMockPlatform 268else: 269 raise RuntimeError(f"Unsupported platform: {plt.PLATFORM}") 270 271 272class MockStory(Story): 273 274 @classmethod 275 def all_story_names(cls): 276 return ["story_1", "story_2"] 277 278 def run(self, run: Run) -> None: 279 pass 280 281 282class MockBenchmark(SubStoryBenchmark): 283 NAME = "mock-benchmark" 284 DEFAULT_STORY_CLS = MockStory 285 286 287class MockCLI(CrossBenchCLI): 288 runner: Runner 289 platform: Platform 290 291 def __init__(self, *args, **kwargs) -> None: 292 self.platform = kwargs.pop("platform") 293 super().__init__(*args, **kwargs) 294 295 def _get_runner(self, args, benchmark, env_config, env_validation_mode, 296 timing): 297 if not args.out_dir: 298 # Use stable mock out dir 299 args.out_dir = pathlib.Path("/results") 300 assert not args.out_dir.exists() 301 runner_kwargs = self.RUNNER_CLS.kwargs_from_cli(args) 302 self.runner = self.RUNNER_CLS( 303 benchmark=benchmark, 304 env_config=env_config, 305 env_validation_mode=env_validation_mode, 306 timing=timing, 307 **runner_kwargs, 308 # Use custom platform 309 platform=self.platform) 310 return self.runner 311