• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import annotations
6
7import atexit
8import datetime as dt
9import logging
10import os
11import shlex
12import sys
13import textwrap
14import threading
15import time
16import urllib
17import urllib.error
18import urllib.parse as urlparse
19import urllib.request
20from subprocess import Popen, TimeoutExpired
21from typing import (TYPE_CHECKING, Any, Callable, Dict, Final, Iterable,
22                    Iterator, List, Optional, Tuple, Type, TypeVar, Union)
23
24from crossbench import plt
25
26if TYPE_CHECKING:
27  import signal
28
29  from crossbench.path import AnyPath, LocalPath
30
31  InputT = TypeVar("InputT")
32  KeyT = TypeVar("KeyT")
33  GroupT = TypeVar("GroupT")
34  PathT = TypeVar("PathT", bound=AnyPath)
35
36assert hasattr(shlex,
37               "join"), ("Please update to python v3.8 that has shlex.join")
38
39
40
41def group_by(collection: Iterable[InputT],
42             key: Callable[[InputT], KeyT],
43             value: Optional[Callable[[InputT], Any]] = None,
44             group: Optional[Callable[[KeyT], GroupT]] = None,
45             sort_key: Optional[Callable[[Tuple[KeyT, GroupT]], Any]] = str
46            ) -> Dict[KeyT, GroupT]:
47  """
48  Works similar to itertools.groupby but does a global, SQL-style grouping
49  instead of a line-by-line basis like uniq.
50
51  key:   a function that returns the grouping key for a group
52  group: a function that accepts a group_key and returns a group object that
53    has an append() method.
54  """
55  assert key, "No key function provided"
56  key_fn = key
57  value_fn = value or (lambda item: item)
58  group_fn: Callable[[KeyT], GroupT] = group or (lambda key: [])
59  groups: Dict[KeyT, GroupT] = {}
60  for input_item in collection:
61    group_key: KeyT = key_fn(input_item)
62    group_item = value_fn(input_item)
63    if group_key not in groups:
64      new_group: GroupT = group_fn(group_key)
65      groups[group_key] = new_group
66      new_group.append(group_item)
67    else:
68      groups[group_key].append(group_item)
69  if sort_key:
70    # sort keys as well for more predictable behavior
71    return dict(sorted(groups.items(), key=sort_key))
72  return dict(groups.items())
73
74
75def sort_by_file_size(files: Iterable[PathT],
76                      platform: plt.Platform = plt.PLATFORM) -> List[PathT]:
77  return sorted(files, key=lambda f: (platform.file_size(f), f.name))
78
79
80SIZE_UNITS: Final[Tuple[str, ...]] = ("B", "KiB", "MiB", "GiB", "TiB")
81
82
83def get_file_size(file: AnyPath,
84                  digits: int = 2,
85                  platform: plt.Platform = plt.PLATFORM) -> str:
86  size: float = float(platform.file_size(file))
87  unit_index = 0
88  divisor = 1024.0
89  while (unit_index < len(SIZE_UNITS)) and size >= divisor:
90    unit_index += 1
91    size /= divisor
92  return f"{size:.{digits}f} {SIZE_UNITS[unit_index]}"
93
94# =============================================================================
95
96
97def urlopen(url: str, timeout: Union[int, float] = 10):
98  try:
99    logging.debug("Opening url: %s", url)
100    return urllib.request.urlopen(url, timeout=timeout)
101  except (urllib.error.HTTPError, urllib.error.URLError) as e:
102    logging.info("Could not load url=%s", url)
103    raise e
104
105
106# =============================================================================
107
108
109class ChangeCWD:
110
111  def __init__(self, destination: LocalPath) -> None:
112    self.new_dir = destination
113    self.prev_dir: Optional[str] = None
114
115  def __enter__(self) -> None:
116    self.prev_dir = os.getcwd()
117    os.chdir(self.new_dir)
118    logging.debug("CWD=%s", self.new_dir)
119
120  def __exit__(self, exc_type, exc_value, exc_traceback) -> None:
121    assert self.prev_dir, "ChangeCWD was not entered correctly."
122    os.chdir(self.prev_dir)
123
124
125class SystemSleepPreventer:
126  """
127  Prevent the system from going to sleep while running the benchmark.
128  """
129
130  def __init__(self) -> None:
131    self._process: Optional[Popen] = None
132
133  def __enter__(self) -> None:
134    if plt.PLATFORM.is_macos:
135      self._process = plt.PLATFORM.popen("caffeinate", "-imdsu")
136      atexit.register(self.stop_process)
137    # TODO: Add linux support
138
139  def __exit__(self, exc_type, exc_value, exc_traceback) -> None:
140    self.stop_process()
141
142  def stop_process(self) -> None:
143    if self._process:
144      self._process.kill()
145      self._process = None
146
147
148class TimeScope:
149  """
150  Measures and logs the time spend during the lifetime of the TimeScope.
151  """
152
153  def __init__(self, message: str, level: int = 3) -> None:
154    self._message = message
155    self._level = level
156    self._start: Optional[dt.datetime] = None
157    self._duration: dt.timedelta = dt.timedelta()
158
159  @property
160  def message(self) -> str:
161    return self._message
162
163  @property
164  def duration(self) -> dt.timedelta:
165    return self._duration
166
167  def __enter__(self) -> TimeScope:
168    self._start = dt.datetime.now()
169    return self
170
171  def __exit__(self, exc_type, exc_value, exc_traceback) -> None:
172    assert self._start
173    self._duration = dt.datetime.now() - self._start
174    logging.log(self._level, "%s duration=%s", self._message, self._duration)
175
176
177def as_timedelta(value: Union[int, float, dt.timedelta]) -> dt.timedelta:
178  if isinstance(value, dt.timedelta):
179    return value
180  return dt.timedelta(seconds=value)
181
182
183class WaitRange:
184  """
185  Create wait/sleep ranges with the given parameters:
186
187  If present we start with the initial delay, and then exponentially
188  increase the sleep/wait time by the given factor, until we reach the max
189  sleep time.
190
191  | delay | min | min * factor | ... | min * factor ** N | max | ... | max |
192  | ----------------------------- timeout ---------------------------------|
193
194  The timeout puts an upper bound to the total sleep time when using
195  wait_with_backoff().
196  """
197  min: dt.timedelta
198  max: dt.timedelta
199  initial_sleep: dt.timedelta
200  max_iterations: Optional[int]
201
202  def __init__(
203      self,
204      min: Union[int, float, dt.timedelta] = 0.1,  # pylint: disable=redefined-builtin
205      timeout: Union[int, float, dt.timedelta] = 10,
206      factor: float = 1.01,
207      max: Optional[Union[int, float, dt.timedelta]] = None,  # pylint: disable=redefined-builtin
208      max_iterations: Optional[int] = None,
209      delay: Union[int, float, dt.timedelta] = 0) -> None:
210    self.min = as_timedelta(min)
211    assert self.min.total_seconds() > 0
212    if not max:
213      self.max = self.min * 10
214    else:
215      self.max = as_timedelta(max)
216    assert self.min <= self.max
217    assert 1.0 < factor
218    self.factor = factor
219    self.timeout = as_timedelta(timeout)
220    assert 0 < self.timeout.total_seconds()
221    self.delay = as_timedelta(delay)
222    assert self.delay <= self.timeout
223    assert max_iterations is None or max_iterations > 0
224    self.max_iterations = max_iterations
225
226  def __iter__(self) -> Iterator[dt.timedelta]:
227    i = 0
228    if self.delay:
229      yield self.delay
230    current_sleep = self.min
231    while self.max_iterations is None or i < self.max_iterations:
232      yield current_sleep
233      current_sleep = min(current_sleep * self.factor, self.max)
234      i += 1
235
236  def wait_with_backoff(
237      self,
238      platform: plt.Platform = plt.PLATFORM) -> Iterator[Tuple[float, float]]:
239    start = dt.datetime.now()
240    timeout = self.timeout
241    for sleep_for in self:
242      duration = dt.datetime.now() - start
243      if duration > self.timeout:
244        raise TimeoutError(f"Waited for {duration}")
245      time_left = timeout - duration
246      yield duration.total_seconds(), time_left.total_seconds()
247      platform.sleep(sleep_for.total_seconds())
248
249
250def wait_with_backoff(
251    wait_range: Union[int, float, dt.timedelta, WaitRange],
252    platform: plt.Platform = plt.PLATFORM) -> Iterator[Tuple[float, float]]:
253  if not isinstance(wait_range, WaitRange):
254    wait_range = WaitRange(timeout=wait_range)
255  return wait_range.wait_with_backoff(platform)
256
257
258class DurationMeasureContext:
259
260  def __init__(self, durations: Durations, name: str) -> None:
261    self._start_time = dt.datetime.utcfromtimestamp(0)
262    self._durations = durations
263    self._name = name
264
265  def __enter__(self) -> DurationMeasureContext:
266    self._start_time = dt.datetime.now()
267    return self
268
269  def __exit__(self, exc_type, exc_value, traceback) -> None:
270    assert self._start_time
271    delta = dt.datetime.now() - self._start_time
272    self._durations[self._name] = delta
273
274
275class Durations:
276  """
277  Helper object to track durations.
278  """
279
280  def __init__(self) -> None:
281    self._durations: Dict[str, dt.timedelta] = {}
282
283  def __getitem__(self, name: str) -> dt.timedelta:
284    return self._durations[name]
285
286  def __setitem__(self, name: str, duration: dt.timedelta) -> None:
287    assert name not in self._durations, f"Cannot set '{name}' duration twice!"
288    self._durations[name] = duration
289
290  def __len__(self) -> int:
291    return len(self._durations)
292
293  def measure(self, name: str) -> DurationMeasureContext:
294    assert name not in self._durations, (
295        f"Cannot measure '{name}' duration twice!")
296    return DurationMeasureContext(self, name)
297
298  def to_json(self) -> Dict[str, float]:
299    return {
300        name: self._durations[name].total_seconds()
301        for name in sorted(self._durations.keys())
302    }
303
304
305def wrap_lines(body: str, width: int = 80, indent: str = "") -> Iterable[str]:
306  for line in body.splitlines():
307    if len(line) <= width:
308      yield f"{indent}{line}"
309      continue
310    for split in textwrap.wrap(line, width):
311      yield f"{indent}{split}"
312
313
314def type_name(t: Type) -> str:
315  module = t.__module__
316  if not module:
317    return t.__qualname__
318  return f"{module}.{t.__qualname__}"
319
320
321class Spinner:
322  CURSORS = "◐◓◑◒"
323
324  def __init__(self, sleep: float = 0.5) -> None:
325    self._is_running = False
326    self._sleep_time = sleep
327
328  def __enter__(self) -> None:
329    # Only enable the spinner if the output is an interactive terminal.
330    is_atty = hasattr(sys.stdout, "isatty") and sys.stdout.isatty()
331    if is_atty:
332      self._is_running = True
333      threading.Thread(target=self._spin).start()
334
335  def __exit__(self, exc_type, exc_value, traceback) -> None:
336    if self._is_running:
337      self._is_running = False
338      self._sleep()
339
340  def _cursors(self) -> Iterable[str]:
341    while True:
342      yield from Spinner.CURSORS
343
344  def _spin(self) -> None:
345    stdout = sys.stdout
346    for cursor in self._cursors():
347      if not self._is_running:
348        return
349      # Print the current wait-cursor and send a carriage return to move to the
350      # start of the line.
351      stdout.write(f" {cursor}\r")
352      stdout.flush()
353      self._sleep()
354
355  def _sleep(self) -> None:
356    time.sleep(self._sleep_time)
357
358
359
360def update_url_query(url: str, query_params: Dict[str, str]) -> str:
361  parsed_url = urlparse.urlparse(url)
362  query = dict(urlparse.parse_qsl(parsed_url.query))
363  query.update(query_params)
364  parsed_url = parsed_url._replace(query=urlparse.urlencode(query, doseq=True))
365  return parsed_url.geturl()
366
367
368def wait_and_kill(process: Popen,
369                  timeout=1,
370                  signal: Optional[signal.Signals] = None) -> None:
371  """Graceful process termination:
372  1. Send signal if provided,
373  2. wait for the given time,
374  3. terminate(),
375  4. Last stage: kill process.
376  """
377  logging.debug("wait_and_kill: %s", process)
378  try:
379    wait_and_terminate(process, timeout, signal)
380  finally:
381    try:
382      process.kill()
383    except ProcessLookupError:
384      pass
385
386
387def wait_and_terminate(process,
388                       timeout=1,
389                       signal: Optional[signal.Signals] = None) -> None:
390  if process.poll() is not None:
391    return
392  logging.debug("Terminating process: %s", process)
393  try:
394    if signal:
395      process.send_signal(signal)
396    process.wait(timeout)
397    return
398  except TimeoutExpired as e:
399    logging.debug("Got timeout while waiting "
400                  "for process shutdown (%s): %s", process, e)
401  except Exception as e:  # pylint: disable=broad-except
402    logging.debug("Ignoring exception during process termination: %s", e)
403  finally:
404    try:
405      process.terminate()
406    except ProcessLookupError:
407      pass
408
409
410class RepeatTimer(threading.Timer):
411
412  def run(self) -> None:
413    while not self.finished.wait(self.interval):
414      self.function(*self.args, **self.kwargs)
415
416  def __enter__(self, *args, **kwargs):
417    self.start()
418
419  def __exit__(self, *args, **kwargs):
420    self.cancel()
421
422
423def input_with_timeout(timeout=dt.timedelta(seconds=10), default=None):
424  result_container = [default]
425  wait = threading.Thread(
426      target=_input, args=[
427          result_container,
428      ])
429  wait.daemon = True
430  wait.start()
431  wait.join(timeout=timeout.total_seconds())
432  return result_container[0]
433
434
435def _input(results_container):
436  try:
437    results_container[0] = input()
438  except KeyboardInterrupt:
439    pass
440