• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import annotations
6
7import json
8import logging
9import math
10from math import floor, log10
11from typing import (TYPE_CHECKING, Any, Callable, Dict, Hashable, Iterable,
12                    List, Optional, Sequence, Set, Tuple, Union)
13
14from crossbench.probes import helper
15
16if TYPE_CHECKING:
17  from crossbench.path import LocalPath
18  from crossbench.types import Json, JsonDict
19
20
21
22def is_number(value: Any) -> bool:
23  return isinstance(value, (int, float))
24
25
26class Metric:
27  """
28  Metric provides simple statistical getters if the collected values are
29  ints or floats only.
30  """
31
32  @classmethod
33  def format(cls,
34             value: Union[float, int],
35             stddev: Optional[float] = None) -> str:
36    """Format value and stdev to only expose significant + 1 digits.
37    Example outputs:
38      100 ± 10%
39      100.1 ± 1.2%
40      100.12 ± 0.12%
41      100.123 ± 0.012%
42      100.1235 ± 0.0012%
43    """
44    if not stddev:
45      return str(value)
46    stddev = float(stddev)
47    stddev_significant_digit = int(floor(log10(abs(stddev))))
48    value_width = max(0, 1 - stddev_significant_digit)
49    percent = stddev / value * 100
50    percent_significant_digit = int(floor(log10(abs(percent))))
51    percent_width = max(0, 1 - percent_significant_digit)
52    return f"{value:.{value_width}f} ± {percent:.{percent_width}f}%"
53
54  @classmethod
55  def from_json(cls, json_data: JsonDict) -> Metric:
56    values = json_data["values"]
57    assert isinstance(values, list)
58    return cls(values)
59
60  def __init__(self, values: Optional[List] = None) -> None:
61    self.values = values or []
62    self._is_numeric: bool = all(map(is_number, self.values))
63
64  def __len__(self) -> int:
65    return len(self.values)
66
67  @property
68  def is_numeric(self) -> bool:
69    return self._is_numeric
70
71  @property
72  def min(self) -> float:
73    assert self._is_numeric
74    return min(self.values)
75
76  @property
77  def max(self) -> float:
78    assert self._is_numeric
79    return max(self.values)
80
81  @property
82  def sum(self) -> float:
83    assert self._is_numeric
84    return sum(self.values)
85
86  @property
87  def average(self) -> float:
88    assert self._is_numeric
89    return sum(self.values) / len(self.values)
90
91  @property
92  def geomean(self) -> float:
93    assert self._is_numeric
94    return geomean(self.values)
95
96  @property
97  def stddev(self) -> float:
98    assert self._is_numeric
99    # We're ignoring here any actual distribution of the data and use this as a
100    # rough estimate of the quality of the data
101    average = self.average
102    variance = 0.0
103    for value in self.values:
104      variance += (average - value)**2
105    variance /= len(self.values)
106    return math.sqrt(variance)
107
108  def append(self, value: Any) -> None:
109    self.values.append(value)
110    self._is_numeric = self._is_numeric and is_number(value)
111
112  def to_json(self) -> JsonDict:
113    json_data: JsonDict = {"values": self.values}
114    if not self.values:
115      return json_data
116    if self.is_numeric:
117      json_data["min"] = self.min
118      average = json_data["average"] = self.average
119      json_data["geomean"] = self.geomean
120      json_data["max"] = self.max
121      json_data["sum"] = self.sum
122      stddev = json_data["stddev"] = self.stddev
123      if average == 0:
124        json_data["stddevPercent"] = 0
125      else:
126        json_data["stddevPercent"] = (stddev / average) * 100
127      return json_data
128    # Try to simplify repeated non-numeric values
129    if not isinstance(self.values[0], Hashable):
130      return json_data
131    if len(set(self.values)) == 1:
132      return self.values[0]
133    return json_data
134
135
136def geomean(values: Iterable[Union[int, float]]) -> float:
137  product: float = 1
138  length: int = 0
139  for value in values:
140    product *= value
141    length += 1
142  return product**(1 / length)
143
144
145def metric_geomean(metric: Metric) -> float:
146  return metric.geomean
147
148
149class MetricsMerger:
150  """
151  Merges hierarchical data into 1-level aggregated data;
152
153  Input:
154  data_1 ={
155    "a": {
156      "aa": 1.1,
157      "ab": 2
158    }
159    "b": 2.1
160  }
161  data_2 = {
162    "a": {
163      "aa": 1.2
164    }
165    "b": 2.2,
166    "c": 2
167  }
168
169  The merged data maps str => Metric():
170
171  MetricsMerger(data_1, data_2).data == {
172    "a/aa": Metric(1.1, 1.2)
173    "a/ab": Metric(2)
174    "b":    Metric(2.1, 2.2)
175    "c":    Metric(2)
176  }
177  """
178
179  @classmethod
180  def merge_json_list(cls,
181                      files: Iterable[LocalPath],
182                      key_fn: Optional[helper.KeyFnType] = None,
183                      merge_duplicate_paths: bool = False) -> MetricsMerger:
184    merger = cls(key_fn=key_fn)
185    for file in files:
186      with file.open(encoding="utf-8") as f:
187        merger.merge_values(
188            json.load(f), merge_duplicate_paths=merge_duplicate_paths)
189    return merger
190
191  def __init__(self,
192               *args: Union[Dict, List[Dict]],
193               key_fn: Optional[helper.KeyFnType] = None):
194    """Create a new MetricsMerger
195
196    Args:
197        *args (optional): Optional hierarchical data to be merged.
198        key_fn (optional): Maps property paths (Tuple[str,...]) to strings used
199          as keys to group/merge values, or None to skip property paths.
200    """
201    self._data: Dict[str, Metric] = {}
202    self._key_fn: helper.KeyFnType = key_fn or helper._default_flatten_key_fn
203    self._ignored_keys: Set[str] = set()
204    for data in args:
205      self.add(data)
206
207  @property
208  def data(self) -> Dict[str, Metric]:
209    return self._data
210
211  def merge_values(self,
212                   data: Dict[str, Dict],
213                   prefix_path: Tuple[str, ...] = (),
214                   merge_duplicate_paths: bool = False) -> None:
215    """Merge a previously json-serialized MetricsMerger object"""
216    for property_name, item in data.items():
217      path = prefix_path + (property_name,)
218      key = self._key_fn(path)
219      if key is None or key in self._ignored_keys:
220        continue
221      if key in self._data:
222        if merge_duplicate_paths:
223          values = self._data[key]
224          for value in item["values"]:
225            values.append(value)
226        else:
227          logging.debug(
228              "Removing Metric with the same key-path='%s', key='%s"
229              "from multiple files.", path, key)
230          del self._data[key]
231          self._ignored_keys.add(key)
232      else:
233        self._data[key] = Metric.from_json(item)
234
235  def add(self, data: Union[Dict, List[Dict]]) -> None:
236    """ Merge "arbitrary" hierarchical data that ends up having primitive leafs.
237    Anything that is not a dict is considered a leaf node.
238    """
239    if isinstance(data, list):
240      # Assume that top-level lists are repetitions of the same data
241      for item in data:
242        self._merge(item)
243    else:
244      self._merge(data)
245
246  def _merge(
247      self, data: Union[Dict,
248                        List[Dict]], parent_path: Tuple[str, ...] = ()) -> None:
249    assert isinstance(data, dict)
250    for property_name, value in data.items():
251      path = parent_path + (property_name,)
252      key: Optional[str] = self._key_fn(path)
253      if key is None:
254        continue
255      if isinstance(value, dict):
256        self._merge(value, path)
257      else:
258        if key in self._data:
259          values = self._data[key]
260        else:
261          values = self._data[key] = Metric()
262        if isinstance(value, list):
263          for v in value:
264            values.append(v)
265        else:
266          values.append(value)
267
268  def to_json(self,
269              value_fn: Optional[Callable[[Any], Json]] = None,
270              sort: bool = True) -> JsonDict:
271    items = []
272    for key, value in self._data.items():
273      assert isinstance(value, Metric)
274      if value_fn is None:
275        json_value: Json = value.to_json()
276      else:
277        json_value = value_fn(value)
278      items.append((key, json_value))
279    if sort:
280      # Make sure the data is always in the same order, independent of the input
281      # order
282      items.sort()
283    return dict(items)
284
285
286class CSVFormatter:
287  """
288  Headers: [
289    ["label_1", "value_1"],
290    ["label_2", "value_2"],
291  ]
292  Input: {
293      "A_1/B_1/Async": 1,
294      "A_1/B_2/Sync": 2,
295      "A_1/Total": 3,
296      "Total": 3,
297    }
298  Output: [
299    ["label_1",      "",      "",      "",      "value_1],
300    ["label_2",      "",      "",      "",      "value_2],
301    ["A_1/B1/Async", "A1",    "B1",    "Async", 1],
302    ["A_1/B2/Sync",  "A1",    "B2",    "Sync",  2],
303    ["A_1/Total",    "A1",    "Total", "",      3],
304    ["Total"         "Total", "",      "",      3],
305  ]
306  """
307
308  def __init__(self,
309               metrics: MetricsMerger,
310               value_fn: Optional[Callable[[Any], Any]] = None,
311               headers: Sequence[Tuple[Any, ...]] = (),
312               include_parts: bool = True,
313               sort: bool = True):
314    self._table: List[Sequence[Any]] = []
315    converted = metrics.to_json(value_fn, sort)
316    items = self.format_items(converted, sort=sort)
317    max_path_depth: int = self.extract_max_depth(items, include_parts)
318    self.append_headers(headers, max_path_depth)
319    self.append_body(items, include_parts, max_path_depth)
320
321  def extract_max_depth(self, items: Sequence[Tuple[str, Json]],
322                        include_parts: bool) -> int:
323    max_path_depth = 0
324    if include_parts:
325      for path, _ in items:
326        max_path_depth = max(max_path_depth, path.count("/"))
327    max_path_depth += 1
328    return max_path_depth
329
330  def append_headers(self, headers, max_path_depth: int) -> None:
331    header_padding = ("",) * max_path_depth
332    for header in headers:
333      assert isinstance(header, tuple), (
334          f"Additional CSV headers must be tuples, got {type(header)}: "
335          f"{header}")
336      row = header[:1] + header_padding + header[1:]
337      self._table.append(row)
338
339  def append_body(self, items: Sequence[Tuple[str, Json]], include_parts: bool,
340                  max_path_depth: int) -> None:
341    for path, value in items:
342      if include_parts:
343        parts = tuple(path.split("/"))
344        buffer = ("",) * (max_path_depth - len(parts))
345        row = (path,) + parts + buffer + (value,)
346      else:
347        row = (path, value)
348      self._table.append(row)
349
350  def format_items(self, data: Dict[str, Json],
351                   sort: bool) -> Sequence[Tuple[str, Json]]:
352    items = tuple(data.items())
353    if not sort:
354      return items
355    return sorted(items)
356
357  @property
358  def table(self) -> List[Sequence[Any]]:
359    return self._table
360