• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5from __future__ import annotations
6
7import csv
8from typing import (TYPE_CHECKING, Any, Callable, Dict, Final, List, Optional,
9                    Sequence, Set, Tuple)
10
11if TYPE_CHECKING:
12  from crossbench.path import LocalPath
13
14INTERNAL_NAME_PREFIX: Final[str] = "cb."
15
16KeyFnType = Callable[[Tuple[str, ...]], Optional[str]]
17
18
19def _default_flatten_key_fn(path: Tuple[str, ...]) -> str:
20  return "/".join(path)
21
22
23class Flatten:
24  """
25  Creates a sorted flat list of (key-path, Metric) from hierarchical data.
26
27  input = {"a" : {"aa1":1, "aa2":2}, "b": 12 }
28  Flatten(input).data == {
29    "a/aa1":  1,
30    "a/aa2":  2,
31    "b":     12,
32  }
33  """
34  _key_fn: KeyFnType
35  _accumulator: Dict[str, Any]
36
37  def __init__(self,
38               *args: Dict,
39               key_fn: Optional[KeyFnType] = None,
40               sort: bool = True) -> None:
41    """_summary_
42
43    Args:
44        *args (optional): Optional hierarchical data to be flattened
45        key_fn (optional): Maps property paths (Tuple[str,...]) to strings used
46          as final result keys, or None to skip property paths.
47    """
48    self._accumulator = {}
49    self._key_fn = key_fn or _default_flatten_key_fn
50    self._sort = sort
51    self.append(*args)
52
53  @property
54  def data(self) -> Dict[str, Any]:
55    if not self._sort:
56      return dict(self._accumulator)
57    items = sorted(self._accumulator.items(), key=lambda item: item[0])
58    return dict(items)
59
60  def append(self, *args: Dict, ignore_toplevel: bool = False) -> None:
61    toplevel_path: Tuple[str, ...] = tuple()
62    for merged_data in args:
63      self._flatten(toplevel_path, merged_data, ignore_toplevel)
64
65  def _is_leaf_item(self, item: Any) -> bool:
66    if isinstance(item, (str, float, int, list)):
67      return True
68    if "values" in item and isinstance(item["values"], list):
69      return True
70    return False
71
72  def _flatten(self,
73               parent_path: Tuple[str, ...],
74               data,
75               ignore_toplevel: bool = False) -> None:
76    for name, item in data.items():
77      if item is None:
78        continue
79      path = parent_path + (name,)
80      if self._is_leaf_item(item):
81        if ignore_toplevel and parent_path == ():
82          continue
83        key = self._key_fn(path)
84        if key is None:
85          continue
86        assert isinstance(key, str)
87        if key in self._accumulator:
88          raise ValueError(f"Duplicate key='{key}' path={path}")
89        self._accumulator[key] = item
90      else:
91        self._flatten(path, item)
92
93
94def _ljust_row(sequence: List, n: int, fill_value: Any = None) -> List:
95  return sequence + ([fill_value] * (n - len(sequence)))
96
97
98def merge_csv(csv_list: Sequence[LocalPath],
99              headers: Optional[List[str]] = None,
100              row_header_len: int = 1,
101              delimiter: str = "\t") -> List[List[Any]]:
102  """
103  Merge multiple CSV files.
104  File 1:
105    Header,     Col Header 1.1, Col Header  1.2
106    ...
107    Row Header, Data 1.1,       Data 1.2
108
109  File 2:
110    Header,     Col Header 2.1,
111    ...
112    Row Header, Data 2.1,
113
114  The first Col has to contain the same data:
115
116  Merged:
117    Header,     Col Header 1.1, Col Header 1.2,  Col Header 2.1,
118    ...
119    Row Header, Data 1.1,       Data 1.2,        Data 2.1,
120
121
122  If no column header is available, filename_as_header=True can be used.
123
124  Merged with file name header:
125            , File 1,           , File 2,
126  Row Header, Data 1.1, Data 1.2, Data 2.1, Data 2.2
127  """
128  table: List[List[Any]] = []
129  # Initial row-headers from the first csv file.
130  known_row_headers: Set[Tuple[str, ...]] = set()
131  row_header_len = _merge_csv_prepare_row_headers(table, known_row_headers,
132                                                  csv_list[0], row_header_len,
133                                                  delimiter)
134
135  # Fill in the header column taken from the first file
136  if headers:
137    table_headers = [None] * row_header_len
138  else:
139    table_headers = []
140
141  table_row_len = row_header_len
142  for csv_file in csv_list:
143    with csv_file.open(encoding="utf-8") as f:
144      csv_data = list(csv.reader(f, delimiter=delimiter))
145    table_row_len = _merge_csv_append(csv_data, table, table_headers,
146                                      row_header_len, headers,
147                                      known_row_headers, table_row_len)
148
149  if table_headers:
150    return [table_headers] + table
151  return table
152
153
154def _merge_csv_prepare_row_headers(table: List[List[Any]],
155                                   known_row_headers: Set[Tuple[str, ...]],
156                                   csv_file: LocalPath, row_header_len: int,
157                                   delimiter: str):
158  with csv_file.open(encoding="utf-8") as first_file:
159    for csv_row in csv.reader(first_file, delimiter=delimiter):
160      if row_header_len == -1:
161        row_header_len = _detect_row_header_len(csv_row)
162      assert csv_row, "Mergeable CSV files must have row names."
163      row_headers = csv_row[:row_header_len]
164      table.append(row_headers)
165      csv_row_header_key = tuple(row_headers)
166      known_row_headers.add(csv_row_header_key)
167  return row_header_len
168
169
170def _detect_row_header_len(row: List[str]) -> int:
171  # Input: ["header", "", "", "value 1", "value 2"]
172  #                        ^
173  # Output: 3
174  for i, value in enumerate(row):
175    if i == 0 or value == "":
176      continue
177    return i
178  return 1
179
180def _merge_csv_append(csv_data: List[List[Any]], table: List[List[Any]],
181                      table_headers, row_header_len: int, headers,
182                      known_row_headers, table_row_len):
183  # Find the max row width in added csv_data.
184  max_csv_row_len = max(len(row) for row in csv_data) - row_header_len
185  if table:
186    table_row_len = len(table[0]) + max_csv_row_len
187  else:
188    table_row_len = max_csv_row_len
189
190  if headers:
191    col_header = [headers.pop(0)]
192    table_headers.extend(_ljust_row(col_header, max_csv_row_len))
193
194  # Pre-computed potential padding lists.
195  skipped_table_row_padding = [None] * max_csv_row_len
196  new_row_padding = [None] * (table_row_len - row_header_len - max_csv_row_len)
197
198  table_index = 0
199  for csv_row in csv_data:
200    csv_row_header = tuple(csv_row[:row_header_len])
201    csv_padded_row = _ljust_row(csv_row[row_header_len:], max_csv_row_len)
202
203    if table_index >= len(table):
204      # Append all additional rows to the end of the table.
205      new_row = list(csv_row_header) + new_row_padding + csv_padded_row
206      table.append(new_row)
207      table_index += 1
208      continue
209
210    table_row = table[table_index]
211    table_row_header = tuple(table_row[:row_header_len])
212
213    if table_row_header == csv_row_header:
214      # Simple case, row-headers are matching the current table.
215      table_row.extend(csv_padded_row)
216      table_index += 1
217      continue
218
219    csv_row_header_key = tuple(csv_row_header)
220
221    # csv_data does not contain the current table_row_header, continue
222    # to find a proper insertion point:
223    # - if the know the row-header exists, loop until we find the matching one,
224    # - otherwise insert before the next row, whose row-header would be
225    #   after csv_row_header when using alpha-compare
226    try_insert_alpha_sorted = csv_row_header_key not in known_row_headers
227    while True:
228      table_row = table[table_index]
229      table_row_header = tuple(table_row[:row_header_len])
230      if table_row_header == csv_row_header:
231        table_row.extend(csv_padded_row)
232        break
233      if try_insert_alpha_sorted and csv_row_header_key < table_row_header:
234        new_row = list(csv_row_header) + new_row_padding + csv_padded_row
235        # Try maintaining alpha-sorting by inserting before the next row.
236        table.insert(table_index, new_row)
237        known_row_headers.add(csv_row_header_key)
238        break
239      table_row.extend(skipped_table_row_padding)
240      table_index += 1
241      if table_index >= len(table):
242        # Append all additional rows to the end of the table.
243        new_row = list(csv_row_header) + new_row_padding + csv_padded_row
244        table.append(new_row)
245        break
246    table_index += 1
247  return table_row_len
248