1# Copyright 2022 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5from __future__ import annotations 6 7import csv 8from typing import (TYPE_CHECKING, Any, Callable, Dict, Final, List, Optional, 9 Sequence, Set, Tuple) 10 11if TYPE_CHECKING: 12 from crossbench.path import LocalPath 13 14INTERNAL_NAME_PREFIX: Final[str] = "cb." 15 16KeyFnType = Callable[[Tuple[str, ...]], Optional[str]] 17 18 19def _default_flatten_key_fn(path: Tuple[str, ...]) -> str: 20 return "/".join(path) 21 22 23class Flatten: 24 """ 25 Creates a sorted flat list of (key-path, Metric) from hierarchical data. 26 27 input = {"a" : {"aa1":1, "aa2":2}, "b": 12 } 28 Flatten(input).data == { 29 "a/aa1": 1, 30 "a/aa2": 2, 31 "b": 12, 32 } 33 """ 34 _key_fn: KeyFnType 35 _accumulator: Dict[str, Any] 36 37 def __init__(self, 38 *args: Dict, 39 key_fn: Optional[KeyFnType] = None, 40 sort: bool = True) -> None: 41 """_summary_ 42 43 Args: 44 *args (optional): Optional hierarchical data to be flattened 45 key_fn (optional): Maps property paths (Tuple[str,...]) to strings used 46 as final result keys, or None to skip property paths. 47 """ 48 self._accumulator = {} 49 self._key_fn = key_fn or _default_flatten_key_fn 50 self._sort = sort 51 self.append(*args) 52 53 @property 54 def data(self) -> Dict[str, Any]: 55 if not self._sort: 56 return dict(self._accumulator) 57 items = sorted(self._accumulator.items(), key=lambda item: item[0]) 58 return dict(items) 59 60 def append(self, *args: Dict, ignore_toplevel: bool = False) -> None: 61 toplevel_path: Tuple[str, ...] = tuple() 62 for merged_data in args: 63 self._flatten(toplevel_path, merged_data, ignore_toplevel) 64 65 def _is_leaf_item(self, item: Any) -> bool: 66 if isinstance(item, (str, float, int, list)): 67 return True 68 if "values" in item and isinstance(item["values"], list): 69 return True 70 return False 71 72 def _flatten(self, 73 parent_path: Tuple[str, ...], 74 data, 75 ignore_toplevel: bool = False) -> None: 76 for name, item in data.items(): 77 if item is None: 78 continue 79 path = parent_path + (name,) 80 if self._is_leaf_item(item): 81 if ignore_toplevel and parent_path == (): 82 continue 83 key = self._key_fn(path) 84 if key is None: 85 continue 86 assert isinstance(key, str) 87 if key in self._accumulator: 88 raise ValueError(f"Duplicate key='{key}' path={path}") 89 self._accumulator[key] = item 90 else: 91 self._flatten(path, item) 92 93 94def _ljust_row(sequence: List, n: int, fill_value: Any = None) -> List: 95 return sequence + ([fill_value] * (n - len(sequence))) 96 97 98def merge_csv(csv_list: Sequence[LocalPath], 99 headers: Optional[List[str]] = None, 100 row_header_len: int = 1, 101 delimiter: str = "\t") -> List[List[Any]]: 102 """ 103 Merge multiple CSV files. 104 File 1: 105 Header, Col Header 1.1, Col Header 1.2 106 ... 107 Row Header, Data 1.1, Data 1.2 108 109 File 2: 110 Header, Col Header 2.1, 111 ... 112 Row Header, Data 2.1, 113 114 The first Col has to contain the same data: 115 116 Merged: 117 Header, Col Header 1.1, Col Header 1.2, Col Header 2.1, 118 ... 119 Row Header, Data 1.1, Data 1.2, Data 2.1, 120 121 122 If no column header is available, filename_as_header=True can be used. 123 124 Merged with file name header: 125 , File 1, , File 2, 126 Row Header, Data 1.1, Data 1.2, Data 2.1, Data 2.2 127 """ 128 table: List[List[Any]] = [] 129 # Initial row-headers from the first csv file. 130 known_row_headers: Set[Tuple[str, ...]] = set() 131 row_header_len = _merge_csv_prepare_row_headers(table, known_row_headers, 132 csv_list[0], row_header_len, 133 delimiter) 134 135 # Fill in the header column taken from the first file 136 if headers: 137 table_headers = [None] * row_header_len 138 else: 139 table_headers = [] 140 141 table_row_len = row_header_len 142 for csv_file in csv_list: 143 with csv_file.open(encoding="utf-8") as f: 144 csv_data = list(csv.reader(f, delimiter=delimiter)) 145 table_row_len = _merge_csv_append(csv_data, table, table_headers, 146 row_header_len, headers, 147 known_row_headers, table_row_len) 148 149 if table_headers: 150 return [table_headers] + table 151 return table 152 153 154def _merge_csv_prepare_row_headers(table: List[List[Any]], 155 known_row_headers: Set[Tuple[str, ...]], 156 csv_file: LocalPath, row_header_len: int, 157 delimiter: str): 158 with csv_file.open(encoding="utf-8") as first_file: 159 for csv_row in csv.reader(first_file, delimiter=delimiter): 160 if row_header_len == -1: 161 row_header_len = _detect_row_header_len(csv_row) 162 assert csv_row, "Mergeable CSV files must have row names." 163 row_headers = csv_row[:row_header_len] 164 table.append(row_headers) 165 csv_row_header_key = tuple(row_headers) 166 known_row_headers.add(csv_row_header_key) 167 return row_header_len 168 169 170def _detect_row_header_len(row: List[str]) -> int: 171 # Input: ["header", "", "", "value 1", "value 2"] 172 # ^ 173 # Output: 3 174 for i, value in enumerate(row): 175 if i == 0 or value == "": 176 continue 177 return i 178 return 1 179 180def _merge_csv_append(csv_data: List[List[Any]], table: List[List[Any]], 181 table_headers, row_header_len: int, headers, 182 known_row_headers, table_row_len): 183 # Find the max row width in added csv_data. 184 max_csv_row_len = max(len(row) for row in csv_data) - row_header_len 185 if table: 186 table_row_len = len(table[0]) + max_csv_row_len 187 else: 188 table_row_len = max_csv_row_len 189 190 if headers: 191 col_header = [headers.pop(0)] 192 table_headers.extend(_ljust_row(col_header, max_csv_row_len)) 193 194 # Pre-computed potential padding lists. 195 skipped_table_row_padding = [None] * max_csv_row_len 196 new_row_padding = [None] * (table_row_len - row_header_len - max_csv_row_len) 197 198 table_index = 0 199 for csv_row in csv_data: 200 csv_row_header = tuple(csv_row[:row_header_len]) 201 csv_padded_row = _ljust_row(csv_row[row_header_len:], max_csv_row_len) 202 203 if table_index >= len(table): 204 # Append all additional rows to the end of the table. 205 new_row = list(csv_row_header) + new_row_padding + csv_padded_row 206 table.append(new_row) 207 table_index += 1 208 continue 209 210 table_row = table[table_index] 211 table_row_header = tuple(table_row[:row_header_len]) 212 213 if table_row_header == csv_row_header: 214 # Simple case, row-headers are matching the current table. 215 table_row.extend(csv_padded_row) 216 table_index += 1 217 continue 218 219 csv_row_header_key = tuple(csv_row_header) 220 221 # csv_data does not contain the current table_row_header, continue 222 # to find a proper insertion point: 223 # - if the know the row-header exists, loop until we find the matching one, 224 # - otherwise insert before the next row, whose row-header would be 225 # after csv_row_header when using alpha-compare 226 try_insert_alpha_sorted = csv_row_header_key not in known_row_headers 227 while True: 228 table_row = table[table_index] 229 table_row_header = tuple(table_row[:row_header_len]) 230 if table_row_header == csv_row_header: 231 table_row.extend(csv_padded_row) 232 break 233 if try_insert_alpha_sorted and csv_row_header_key < table_row_header: 234 new_row = list(csv_row_header) + new_row_padding + csv_padded_row 235 # Try maintaining alpha-sorting by inserting before the next row. 236 table.insert(table_index, new_row) 237 known_row_headers.add(csv_row_header_key) 238 break 239 table_row.extend(skipped_table_row_padding) 240 table_index += 1 241 if table_index >= len(table): 242 # Append all additional rows to the end of the table. 243 new_row = list(csv_row_header) + new_row_padding + csv_padded_row 244 table.append(new_row) 245 break 246 table_index += 1 247 return table_row_len 248