1# Copyright 2022 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import annotations 6 7import atexit 8import datetime as dt 9import logging 10import os 11import shlex 12import sys 13import textwrap 14import threading 15import time 16import urllib 17import urllib.error 18import urllib.parse as urlparse 19import urllib.request 20from subprocess import Popen, TimeoutExpired 21from typing import (TYPE_CHECKING, Any, Callable, Dict, Final, Iterable, 22 Iterator, List, Optional, Tuple, Type, TypeVar, Union) 23 24from crossbench import plt 25 26if TYPE_CHECKING: 27 import signal 28 29 from crossbench.path import AnyPath, LocalPath 30 31 InputT = TypeVar("InputT") 32 KeyT = TypeVar("KeyT") 33 GroupT = TypeVar("GroupT") 34 PathT = TypeVar("PathT", bound=AnyPath) 35 36assert hasattr(shlex, 37 "join"), ("Please update to python v3.8 that has shlex.join") 38 39 40 41def group_by(collection: Iterable[InputT], 42 key: Callable[[InputT], KeyT], 43 value: Optional[Callable[[InputT], Any]] = None, 44 group: Optional[Callable[[KeyT], GroupT]] = None, 45 sort_key: Optional[Callable[[Tuple[KeyT, GroupT]], Any]] = str 46 ) -> Dict[KeyT, GroupT]: 47 """ 48 Works similar to itertools.groupby but does a global, SQL-style grouping 49 instead of a line-by-line basis like uniq. 50 51 key: a function that returns the grouping key for a group 52 group: a function that accepts a group_key and returns a group object that 53 has an append() method. 54 """ 55 assert key, "No key function provided" 56 key_fn = key 57 value_fn = value or (lambda item: item) 58 group_fn: Callable[[KeyT], GroupT] = group or (lambda key: []) 59 groups: Dict[KeyT, GroupT] = {} 60 for input_item in collection: 61 group_key: KeyT = key_fn(input_item) 62 group_item = value_fn(input_item) 63 if group_key not in groups: 64 new_group: GroupT = group_fn(group_key) 65 groups[group_key] = new_group 66 new_group.append(group_item) 67 else: 68 groups[group_key].append(group_item) 69 if sort_key: 70 # sort keys as well for more predictable behavior 71 return dict(sorted(groups.items(), key=sort_key)) 72 return dict(groups.items()) 73 74 75def sort_by_file_size(files: Iterable[PathT], 76 platform: plt.Platform = plt.PLATFORM) -> List[PathT]: 77 return sorted(files, key=lambda f: (platform.file_size(f), f.name)) 78 79 80SIZE_UNITS: Final[Tuple[str, ...]] = ("B", "KiB", "MiB", "GiB", "TiB") 81 82 83def get_file_size(file: AnyPath, 84 digits: int = 2, 85 platform: plt.Platform = plt.PLATFORM) -> str: 86 size: float = float(platform.file_size(file)) 87 unit_index = 0 88 divisor = 1024.0 89 while (unit_index < len(SIZE_UNITS)) and size >= divisor: 90 unit_index += 1 91 size /= divisor 92 return f"{size:.{digits}f} {SIZE_UNITS[unit_index]}" 93 94# ============================================================================= 95 96 97def urlopen(url: str, timeout: Union[int, float] = 10): 98 try: 99 logging.debug("Opening url: %s", url) 100 return urllib.request.urlopen(url, timeout=timeout) 101 except (urllib.error.HTTPError, urllib.error.URLError) as e: 102 logging.info("Could not load url=%s", url) 103 raise e 104 105 106# ============================================================================= 107 108 109class ChangeCWD: 110 111 def __init__(self, destination: LocalPath) -> None: 112 self.new_dir = destination 113 self.prev_dir: Optional[str] = None 114 115 def __enter__(self) -> None: 116 self.prev_dir = os.getcwd() 117 os.chdir(self.new_dir) 118 logging.debug("CWD=%s", self.new_dir) 119 120 def __exit__(self, exc_type, exc_value, exc_traceback) -> None: 121 assert self.prev_dir, "ChangeCWD was not entered correctly." 122 os.chdir(self.prev_dir) 123 124 125class SystemSleepPreventer: 126 """ 127 Prevent the system from going to sleep while running the benchmark. 128 """ 129 130 def __init__(self) -> None: 131 self._process: Optional[Popen] = None 132 133 def __enter__(self) -> None: 134 if plt.PLATFORM.is_macos: 135 self._process = plt.PLATFORM.popen("caffeinate", "-imdsu") 136 atexit.register(self.stop_process) 137 # TODO: Add linux support 138 139 def __exit__(self, exc_type, exc_value, exc_traceback) -> None: 140 self.stop_process() 141 142 def stop_process(self) -> None: 143 if self._process: 144 self._process.kill() 145 self._process = None 146 147 148class TimeScope: 149 """ 150 Measures and logs the time spend during the lifetime of the TimeScope. 151 """ 152 153 def __init__(self, message: str, level: int = 3) -> None: 154 self._message = message 155 self._level = level 156 self._start: Optional[dt.datetime] = None 157 self._duration: dt.timedelta = dt.timedelta() 158 159 @property 160 def message(self) -> str: 161 return self._message 162 163 @property 164 def duration(self) -> dt.timedelta: 165 return self._duration 166 167 def __enter__(self) -> TimeScope: 168 self._start = dt.datetime.now() 169 return self 170 171 def __exit__(self, exc_type, exc_value, exc_traceback) -> None: 172 assert self._start 173 self._duration = dt.datetime.now() - self._start 174 logging.log(self._level, "%s duration=%s", self._message, self._duration) 175 176 177def as_timedelta(value: Union[int, float, dt.timedelta]) -> dt.timedelta: 178 if isinstance(value, dt.timedelta): 179 return value 180 return dt.timedelta(seconds=value) 181 182 183class WaitRange: 184 """ 185 Create wait/sleep ranges with the given parameters: 186 187 If present we start with the initial delay, and then exponentially 188 increase the sleep/wait time by the given factor, until we reach the max 189 sleep time. 190 191 | delay | min | min * factor | ... | min * factor ** N | max | ... | max | 192 | ----------------------------- timeout ---------------------------------| 193 194 The timeout puts an upper bound to the total sleep time when using 195 wait_with_backoff(). 196 """ 197 min: dt.timedelta 198 max: dt.timedelta 199 initial_sleep: dt.timedelta 200 max_iterations: Optional[int] 201 202 def __init__( 203 self, 204 min: Union[int, float, dt.timedelta] = 0.1, # pylint: disable=redefined-builtin 205 timeout: Union[int, float, dt.timedelta] = 10, 206 factor: float = 1.01, 207 max: Optional[Union[int, float, dt.timedelta]] = None, # pylint: disable=redefined-builtin 208 max_iterations: Optional[int] = None, 209 delay: Union[int, float, dt.timedelta] = 0) -> None: 210 self.min = as_timedelta(min) 211 assert self.min.total_seconds() > 0 212 if not max: 213 self.max = self.min * 10 214 else: 215 self.max = as_timedelta(max) 216 assert self.min <= self.max 217 assert 1.0 < factor 218 self.factor = factor 219 self.timeout = as_timedelta(timeout) 220 assert 0 < self.timeout.total_seconds() 221 self.delay = as_timedelta(delay) 222 assert self.delay <= self.timeout 223 assert max_iterations is None or max_iterations > 0 224 self.max_iterations = max_iterations 225 226 def __iter__(self) -> Iterator[dt.timedelta]: 227 i = 0 228 if self.delay: 229 yield self.delay 230 current_sleep = self.min 231 while self.max_iterations is None or i < self.max_iterations: 232 yield current_sleep 233 current_sleep = min(current_sleep * self.factor, self.max) 234 i += 1 235 236 def wait_with_backoff( 237 self, 238 platform: plt.Platform = plt.PLATFORM) -> Iterator[Tuple[float, float]]: 239 start = dt.datetime.now() 240 timeout = self.timeout 241 for sleep_for in self: 242 duration = dt.datetime.now() - start 243 if duration > self.timeout: 244 raise TimeoutError(f"Waited for {duration}") 245 time_left = timeout - duration 246 yield duration.total_seconds(), time_left.total_seconds() 247 platform.sleep(sleep_for.total_seconds()) 248 249 250def wait_with_backoff( 251 wait_range: Union[int, float, dt.timedelta, WaitRange], 252 platform: plt.Platform = plt.PLATFORM) -> Iterator[Tuple[float, float]]: 253 if not isinstance(wait_range, WaitRange): 254 wait_range = WaitRange(timeout=wait_range) 255 return wait_range.wait_with_backoff(platform) 256 257 258class DurationMeasureContext: 259 260 def __init__(self, durations: Durations, name: str) -> None: 261 self._start_time = dt.datetime.utcfromtimestamp(0) 262 self._durations = durations 263 self._name = name 264 265 def __enter__(self) -> DurationMeasureContext: 266 self._start_time = dt.datetime.now() 267 return self 268 269 def __exit__(self, exc_type, exc_value, traceback) -> None: 270 assert self._start_time 271 delta = dt.datetime.now() - self._start_time 272 self._durations[self._name] = delta 273 274 275class Durations: 276 """ 277 Helper object to track durations. 278 """ 279 280 def __init__(self) -> None: 281 self._durations: Dict[str, dt.timedelta] = {} 282 283 def __getitem__(self, name: str) -> dt.timedelta: 284 return self._durations[name] 285 286 def __setitem__(self, name: str, duration: dt.timedelta) -> None: 287 assert name not in self._durations, f"Cannot set '{name}' duration twice!" 288 self._durations[name] = duration 289 290 def __len__(self) -> int: 291 return len(self._durations) 292 293 def measure(self, name: str) -> DurationMeasureContext: 294 assert name not in self._durations, ( 295 f"Cannot measure '{name}' duration twice!") 296 return DurationMeasureContext(self, name) 297 298 def to_json(self) -> Dict[str, float]: 299 return { 300 name: self._durations[name].total_seconds() 301 for name in sorted(self._durations.keys()) 302 } 303 304 305def wrap_lines(body: str, width: int = 80, indent: str = "") -> Iterable[str]: 306 for line in body.splitlines(): 307 if len(line) <= width: 308 yield f"{indent}{line}" 309 continue 310 for split in textwrap.wrap(line, width): 311 yield f"{indent}{split}" 312 313 314def type_name(t: Type) -> str: 315 module = t.__module__ 316 if not module: 317 return t.__qualname__ 318 return f"{module}.{t.__qualname__}" 319 320 321class Spinner: 322 CURSORS = "◐◓◑◒" 323 324 def __init__(self, sleep: float = 0.5) -> None: 325 self._is_running = False 326 self._sleep_time = sleep 327 328 def __enter__(self) -> None: 329 # Only enable the spinner if the output is an interactive terminal. 330 is_atty = hasattr(sys.stdout, "isatty") and sys.stdout.isatty() 331 if is_atty: 332 self._is_running = True 333 threading.Thread(target=self._spin).start() 334 335 def __exit__(self, exc_type, exc_value, traceback) -> None: 336 if self._is_running: 337 self._is_running = False 338 self._sleep() 339 340 def _cursors(self) -> Iterable[str]: 341 while True: 342 yield from Spinner.CURSORS 343 344 def _spin(self) -> None: 345 stdout = sys.stdout 346 for cursor in self._cursors(): 347 if not self._is_running: 348 return 349 # Print the current wait-cursor and send a carriage return to move to the 350 # start of the line. 351 stdout.write(f" {cursor}\r") 352 stdout.flush() 353 self._sleep() 354 355 def _sleep(self) -> None: 356 time.sleep(self._sleep_time) 357 358 359 360def update_url_query(url: str, query_params: Dict[str, str]) -> str: 361 parsed_url = urlparse.urlparse(url) 362 query = dict(urlparse.parse_qsl(parsed_url.query)) 363 query.update(query_params) 364 parsed_url = parsed_url._replace(query=urlparse.urlencode(query, doseq=True)) 365 return parsed_url.geturl() 366 367 368def wait_and_kill(process: Popen, 369 timeout=1, 370 signal: Optional[signal.Signals] = None) -> None: 371 """Graceful process termination: 372 1. Send signal if provided, 373 2. wait for the given time, 374 3. terminate(), 375 4. Last stage: kill process. 376 """ 377 logging.debug("wait_and_kill: %s", process) 378 try: 379 wait_and_terminate(process, timeout, signal) 380 finally: 381 try: 382 process.kill() 383 except ProcessLookupError: 384 pass 385 386 387def wait_and_terminate(process, 388 timeout=1, 389 signal: Optional[signal.Signals] = None) -> None: 390 if process.poll() is not None: 391 return 392 logging.debug("Terminating process: %s", process) 393 try: 394 if signal: 395 process.send_signal(signal) 396 process.wait(timeout) 397 return 398 except TimeoutExpired as e: 399 logging.debug("Got timeout while waiting " 400 "for process shutdown (%s): %s", process, e) 401 except Exception as e: # pylint: disable=broad-except 402 logging.debug("Ignoring exception during process termination: %s", e) 403 finally: 404 try: 405 process.terminate() 406 except ProcessLookupError: 407 pass 408 409 410class RepeatTimer(threading.Timer): 411 412 def run(self) -> None: 413 while not self.finished.wait(self.interval): 414 self.function(*self.args, **self.kwargs) 415 416 def __enter__(self, *args, **kwargs): 417 self.start() 418 419 def __exit__(self, *args, **kwargs): 420 self.cancel() 421 422 423def input_with_timeout(timeout=dt.timedelta(seconds=10), default=None): 424 result_container = [default] 425 wait = threading.Thread( 426 target=_input, args=[ 427 result_container, 428 ]) 429 wait.daemon = True 430 wait.start() 431 wait.join(timeout=timeout.total_seconds()) 432 return result_container[0] 433 434 435def _input(results_container): 436 try: 437 results_container[0] = input() 438 except KeyboardInterrupt: 439 pass 440