1# Copyright 2023 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import annotations 6 7import json 8import logging 9import math 10from math import floor, log10 11from typing import (TYPE_CHECKING, Any, Callable, Dict, Hashable, Iterable, 12 List, Optional, Sequence, Set, Tuple, Union) 13 14from crossbench.probes import helper 15 16if TYPE_CHECKING: 17 from crossbench.path import LocalPath 18 from crossbench.types import Json, JsonDict 19 20 21 22def is_number(value: Any) -> bool: 23 return isinstance(value, (int, float)) 24 25 26class Metric: 27 """ 28 Metric provides simple statistical getters if the collected values are 29 ints or floats only. 30 """ 31 32 @classmethod 33 def format(cls, 34 value: Union[float, int], 35 stddev: Optional[float] = None) -> str: 36 """Format value and stdev to only expose significant + 1 digits. 37 Example outputs: 38 100 ± 10% 39 100.1 ± 1.2% 40 100.12 ± 0.12% 41 100.123 ± 0.012% 42 100.1235 ± 0.0012% 43 """ 44 if not stddev: 45 return str(value) 46 stddev = float(stddev) 47 stddev_significant_digit = int(floor(log10(abs(stddev)))) 48 value_width = max(0, 1 - stddev_significant_digit) 49 percent = stddev / value * 100 50 percent_significant_digit = int(floor(log10(abs(percent)))) 51 percent_width = max(0, 1 - percent_significant_digit) 52 return f"{value:.{value_width}f} ± {percent:.{percent_width}f}%" 53 54 @classmethod 55 def from_json(cls, json_data: JsonDict) -> Metric: 56 values = json_data["values"] 57 assert isinstance(values, list) 58 return cls(values) 59 60 def __init__(self, values: Optional[List] = None) -> None: 61 self.values = values or [] 62 self._is_numeric: bool = all(map(is_number, self.values)) 63 64 def __len__(self) -> int: 65 return len(self.values) 66 67 @property 68 def is_numeric(self) -> bool: 69 return self._is_numeric 70 71 @property 72 def min(self) -> float: 73 assert self._is_numeric 74 return min(self.values) 75 76 @property 77 def max(self) -> float: 78 assert self._is_numeric 79 return max(self.values) 80 81 @property 82 def sum(self) -> float: 83 assert self._is_numeric 84 return sum(self.values) 85 86 @property 87 def average(self) -> float: 88 assert self._is_numeric 89 return sum(self.values) / len(self.values) 90 91 @property 92 def geomean(self) -> float: 93 assert self._is_numeric 94 return geomean(self.values) 95 96 @property 97 def stddev(self) -> float: 98 assert self._is_numeric 99 # We're ignoring here any actual distribution of the data and use this as a 100 # rough estimate of the quality of the data 101 average = self.average 102 variance = 0.0 103 for value in self.values: 104 variance += (average - value)**2 105 variance /= len(self.values) 106 return math.sqrt(variance) 107 108 def append(self, value: Any) -> None: 109 self.values.append(value) 110 self._is_numeric = self._is_numeric and is_number(value) 111 112 def to_json(self) -> JsonDict: 113 json_data: JsonDict = {"values": self.values} 114 if not self.values: 115 return json_data 116 if self.is_numeric: 117 json_data["min"] = self.min 118 average = json_data["average"] = self.average 119 json_data["geomean"] = self.geomean 120 json_data["max"] = self.max 121 json_data["sum"] = self.sum 122 stddev = json_data["stddev"] = self.stddev 123 if average == 0: 124 json_data["stddevPercent"] = 0 125 else: 126 json_data["stddevPercent"] = (stddev / average) * 100 127 return json_data 128 # Try to simplify repeated non-numeric values 129 if not isinstance(self.values[0], Hashable): 130 return json_data 131 if len(set(self.values)) == 1: 132 return self.values[0] 133 return json_data 134 135 136def geomean(values: Iterable[Union[int, float]]) -> float: 137 product: float = 1 138 length: int = 0 139 for value in values: 140 product *= value 141 length += 1 142 return product**(1 / length) 143 144 145def metric_geomean(metric: Metric) -> float: 146 return metric.geomean 147 148 149class MetricsMerger: 150 """ 151 Merges hierarchical data into 1-level aggregated data; 152 153 Input: 154 data_1 ={ 155 "a": { 156 "aa": 1.1, 157 "ab": 2 158 } 159 "b": 2.1 160 } 161 data_2 = { 162 "a": { 163 "aa": 1.2 164 } 165 "b": 2.2, 166 "c": 2 167 } 168 169 The merged data maps str => Metric(): 170 171 MetricsMerger(data_1, data_2).data == { 172 "a/aa": Metric(1.1, 1.2) 173 "a/ab": Metric(2) 174 "b": Metric(2.1, 2.2) 175 "c": Metric(2) 176 } 177 """ 178 179 @classmethod 180 def merge_json_list(cls, 181 files: Iterable[LocalPath], 182 key_fn: Optional[helper.KeyFnType] = None, 183 merge_duplicate_paths: bool = False) -> MetricsMerger: 184 merger = cls(key_fn=key_fn) 185 for file in files: 186 with file.open(encoding="utf-8") as f: 187 merger.merge_values( 188 json.load(f), merge_duplicate_paths=merge_duplicate_paths) 189 return merger 190 191 def __init__(self, 192 *args: Union[Dict, List[Dict]], 193 key_fn: Optional[helper.KeyFnType] = None): 194 """Create a new MetricsMerger 195 196 Args: 197 *args (optional): Optional hierarchical data to be merged. 198 key_fn (optional): Maps property paths (Tuple[str,...]) to strings used 199 as keys to group/merge values, or None to skip property paths. 200 """ 201 self._data: Dict[str, Metric] = {} 202 self._key_fn: helper.KeyFnType = key_fn or helper._default_flatten_key_fn 203 self._ignored_keys: Set[str] = set() 204 for data in args: 205 self.add(data) 206 207 @property 208 def data(self) -> Dict[str, Metric]: 209 return self._data 210 211 def merge_values(self, 212 data: Dict[str, Dict], 213 prefix_path: Tuple[str, ...] = (), 214 merge_duplicate_paths: bool = False) -> None: 215 """Merge a previously json-serialized MetricsMerger object""" 216 for property_name, item in data.items(): 217 path = prefix_path + (property_name,) 218 key = self._key_fn(path) 219 if key is None or key in self._ignored_keys: 220 continue 221 if key in self._data: 222 if merge_duplicate_paths: 223 values = self._data[key] 224 for value in item["values"]: 225 values.append(value) 226 else: 227 logging.debug( 228 "Removing Metric with the same key-path='%s', key='%s" 229 "from multiple files.", path, key) 230 del self._data[key] 231 self._ignored_keys.add(key) 232 else: 233 self._data[key] = Metric.from_json(item) 234 235 def add(self, data: Union[Dict, List[Dict]]) -> None: 236 """ Merge "arbitrary" hierarchical data that ends up having primitive leafs. 237 Anything that is not a dict is considered a leaf node. 238 """ 239 if isinstance(data, list): 240 # Assume that top-level lists are repetitions of the same data 241 for item in data: 242 self._merge(item) 243 else: 244 self._merge(data) 245 246 def _merge( 247 self, data: Union[Dict, 248 List[Dict]], parent_path: Tuple[str, ...] = ()) -> None: 249 assert isinstance(data, dict) 250 for property_name, value in data.items(): 251 path = parent_path + (property_name,) 252 key: Optional[str] = self._key_fn(path) 253 if key is None: 254 continue 255 if isinstance(value, dict): 256 self._merge(value, path) 257 else: 258 if key in self._data: 259 values = self._data[key] 260 else: 261 values = self._data[key] = Metric() 262 if isinstance(value, list): 263 for v in value: 264 values.append(v) 265 else: 266 values.append(value) 267 268 def to_json(self, 269 value_fn: Optional[Callable[[Any], Json]] = None, 270 sort: bool = True) -> JsonDict: 271 items = [] 272 for key, value in self._data.items(): 273 assert isinstance(value, Metric) 274 if value_fn is None: 275 json_value: Json = value.to_json() 276 else: 277 json_value = value_fn(value) 278 items.append((key, json_value)) 279 if sort: 280 # Make sure the data is always in the same order, independent of the input 281 # order 282 items.sort() 283 return dict(items) 284 285 286class CSVFormatter: 287 """ 288 Headers: [ 289 ["label_1", "value_1"], 290 ["label_2", "value_2"], 291 ] 292 Input: { 293 "A_1/B_1/Async": 1, 294 "A_1/B_2/Sync": 2, 295 "A_1/Total": 3, 296 "Total": 3, 297 } 298 Output: [ 299 ["label_1", "", "", "", "value_1], 300 ["label_2", "", "", "", "value_2], 301 ["A_1/B1/Async", "A1", "B1", "Async", 1], 302 ["A_1/B2/Sync", "A1", "B2", "Sync", 2], 303 ["A_1/Total", "A1", "Total", "", 3], 304 ["Total" "Total", "", "", 3], 305 ] 306 """ 307 308 def __init__(self, 309 metrics: MetricsMerger, 310 value_fn: Optional[Callable[[Any], Any]] = None, 311 headers: Sequence[Tuple[Any, ...]] = (), 312 include_parts: bool = True, 313 sort: bool = True): 314 self._table: List[Sequence[Any]] = [] 315 converted = metrics.to_json(value_fn, sort) 316 items = self.format_items(converted, sort=sort) 317 max_path_depth: int = self.extract_max_depth(items, include_parts) 318 self.append_headers(headers, max_path_depth) 319 self.append_body(items, include_parts, max_path_depth) 320 321 def extract_max_depth(self, items: Sequence[Tuple[str, Json]], 322 include_parts: bool) -> int: 323 max_path_depth = 0 324 if include_parts: 325 for path, _ in items: 326 max_path_depth = max(max_path_depth, path.count("/")) 327 max_path_depth += 1 328 return max_path_depth 329 330 def append_headers(self, headers, max_path_depth: int) -> None: 331 header_padding = ("",) * max_path_depth 332 for header in headers: 333 assert isinstance(header, tuple), ( 334 f"Additional CSV headers must be tuples, got {type(header)}: " 335 f"{header}") 336 row = header[:1] + header_padding + header[1:] 337 self._table.append(row) 338 339 def append_body(self, items: Sequence[Tuple[str, Json]], include_parts: bool, 340 max_path_depth: int) -> None: 341 for path, value in items: 342 if include_parts: 343 parts = tuple(path.split("/")) 344 buffer = ("",) * (max_path_depth - len(parts)) 345 row = (path,) + parts + buffer + (value,) 346 else: 347 row = (path, value) 348 self._table.append(row) 349 350 def format_items(self, data: Dict[str, Json], 351 sort: bool) -> Sequence[Tuple[str, Json]]: 352 items = tuple(data.items()) 353 if not sort: 354 return items 355 return sorted(items) 356 357 @property 358 def table(self) -> List[Sequence[Any]]: 359 return self._table 360