• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import csv
6import pathlib
7import unittest
8from typing import List, Optional
9
10from crossbench.probes import helper
11from tests import test_helper
12from tests.crossbench.base import CrossbenchFakeFsTestCase
13
14
15class TestMergeCSV(CrossbenchFakeFsTestCase):
16
17  def merge(self,
18            *args,
19            delimiter: str = "\t",
20            headers: Optional[List[str]] = None,
21            row_header_len: int = 1):
22    csv_files = []
23    for index, content in enumerate(args):
24      csv_file = pathlib.Path(f"test.{index}.csv")
25      with csv_file.open("w", newline="", encoding="utf-8") as f:
26        csv.writer(f, delimiter=delimiter).writerows(content)
27      csv_files.append(csv_file)
28    return helper.merge_csv(
29        csv_files,
30        delimiter=delimiter,
31        headers=headers,
32        row_header_len=row_header_len)
33
34  def test_merge_single(self):
35    data = [
36        ["Metric", "Run1"],
37        ["Total", "200"],
38    ]
39    for delimiter in ["\t", ","]:
40      merged = self.merge(data, delimiter=delimiter)
41      self.assertListEqual(merged, data)
42
43  def test_merge_single_padding(self):
44    data = [
45        ["Metric", "Run1", "Run2"],
46        ["marker"],
47        ["Total", "200", "300"],
48    ]
49    merged = self.merge(data, headers=None)
50    self.assertListEqual(merged, [
51        ["Metric", "Run1", "Run2"],
52        ["marker", None, None],
53        ["Total", "200", "300"],
54    ])
55
56  def test_merge_single_file_header(self):
57    data = [
58        ["Total", "200"],
59    ]
60    for delimiter in ["\t", ","]:
61      merged = self.merge(data, delimiter=delimiter, headers=["custom"])
62      self.assertListEqual(merged, [
63          [None, "custom"],
64          ["Total", "200"],
65      ])
66
67  def test_merge_two_padding(self):
68    data_1 = [
69        ["marker"],
70        ["Total", "101", "102"],
71    ]
72    data_2 = [
73        ["marker"],
74        ["Total", "201"],
75    ]
76    merged = self.merge(data_1, data_2, headers=["col_1", "col_2"])
77    self.assertListEqual(merged, [
78        [None, "col_1", None, "col_2"],
79        ["marker", None, None, None],
80        ["Total", "101", "102", "201"],
81    ])
82
83  def test_merge_two_long_row_header(self):
84    data_1 = [
85        ["full-marker", "marker"],
86        ["Full/Total", "Total", "101", "102"],
87    ]
88    data_2 = [
89        ["full-marker", "marker"],
90        ["Full/Total", "Total", "201"],
91    ]
92    merged = self.merge(
93        data_1, data_2, headers=["col_1", "col_2"], row_header_len=2)
94    self.assertListEqual(merged, [
95        [None, None, "col_1", None, "col_2"],
96        ["full-marker", "marker", None, None, None],
97        ["Full/Total", "Total", "101", "102", "201"],
98    ])
99
100  def test_merge_two_disjoint_consecutive(self):
101    data_1 = [
102        ["marker"],
103        ["A", "101", "102"],
104        ["B", "101", "102"],
105    ]
106    data_2 = [
107        ["marker"],
108        ["C", "201"],
109        ["D", "201"],
110    ]
111    merged = self.merge(data_1, data_2)
112    self.assertListEqual(merged, [
113        ["marker", None, None, None],
114        ["A", "101", "102", None],
115        ["B", "101", "102", None],
116        ["C", None, None, "201"],
117        ["D", None, None, "201"],
118    ])
119
120  def test_merge_two_disjoint_interleaved(self):
121    data_1 = [
122        ["marker"],
123        ["B", "101", "102"],
124        ["C", "201"],
125    ]
126    data_2 = [
127        ["marker"],
128        ["A", "101", "102"],
129        ["D", "201"],
130    ]
131    merged = self.merge(data_1, data_2)
132    self.assertListEqual(merged, [
133        ["marker", None, None, None, None],
134        ["A", None, None, "101", "102"],
135        ["B", "101", "102", None, None],
136        ["C", "201", None, None, None],
137        ["D", None, None, "201", None],
138    ])
139
140  def test_merge_two_missing(self):
141    data_1 = [
142        ["marker"],
143        ["Total-A0"],
144        ["Total-A1", "101"],
145        ["Total-A2", "111", "112"],
146        ["Total-A3", "301", "302"],
147        ["Total-B", "01"],
148        ["Total-X", "201", "202"],
149    ]
150    data_2 = [
151        ["marker"],
152        ["Total-B", "02"],
153        ["Total-C1", "401", "402"],
154        ["Total-C2", "501"],
155        ["Total-C3", "601", "602"],
156        ["Total-C4", "701"],
157        ["Total-X", "203"],
158    ]
159    merged = self.merge(data_1, data_2, headers=["col_1", "col_2"])
160    self.assertListEqual(merged, [
161        [None, "col_1", None, "col_2", None],
162        ["marker", None, None, None, None],
163        ["Total-A0", None, None, None, None],
164        ["Total-A1", "101", None, None, None],
165        ["Total-A2", "111", "112", None, None],
166        ["Total-A3", "301", "302", None, None],
167        ["Total-B", "01", None, "02", None],
168        ["Total-C1", None, None, "401", "402"],
169        ["Total-C2", None, None, "501", None],
170        ["Total-C3", None, None, "601", "602"],
171        ["Total-C4", None, None, "701", None],
172        ["Total-X", "201", "202", "203", None],
173    ])
174
175  def test_merge_two_duplicate(self):
176    data_1 = [
177        ["A", "101"],
178        ["A", "201"],
179    ]
180    data_2 = [
181        ["A", "301"],
182        ["A", "401"],
183    ]
184    merged = self.merge(data_1, data_2)
185    self.assertListEqual(merged, [
186        ["A", "101", "301"],
187        ["A", "201", "401"],
188    ])
189
190  def test_merge_two_partial_duplicate(self):
191    data_1 = [
192        ["marker"],
193        ["A", "101"],
194        ["A", "201"],
195        ["B", "B01"],
196    ]
197    data_2 = [
198        ["marker"],
199        ["A", "301"],
200        ["A", "401"],
201        ["C", "C01"],
202    ]
203    merged = self.merge(data_1, data_2)
204    self.assertListEqual(merged, [
205        ["marker", None, None],
206        ["A", "101", "301"],
207        ["A", "201", "401"],
208        ["B", "B01", None],
209        ["C", None, "C01"],
210    ])
211
212
213class TestFlatten(unittest.TestCase):
214
215  def flatten(self, *data, key_fn=None, sort: bool = True):
216    return helper.Flatten(*data, key_fn=key_fn, sort=sort).data
217
218  def test_single(self):
219    data = {
220        "a": 1,
221        "b": 2,
222    }
223    flattened = self.flatten(data)
224    self.assertDictEqual(flattened, data)
225
226  def test_single_sort(self):
227    data = {
228        "b": 2,
229        "a": 1,
230    }
231    flattened_keys = tuple(self.flatten(data, sort=True).keys())
232    self.assertTupleEqual(flattened_keys, ("a", "b"))
233    flattened_keys = tuple(self.flatten(data, sort=False).keys())
234    self.assertTupleEqual(flattened_keys, ("b", "a"))
235
236  def test_single_nested(self):
237    data = {
238        "a": 1,
239        "b": {
240            "a": 2,
241            "b": 3
242        },
243    }
244    flattened = self.flatten(data)
245    self.assertDictEqual(flattened, {"a": 1, "b/a": 2, "b/b": 3})
246
247  def test_single_key_fn(self):
248    data = {
249        "a": 1,
250        "b": 2,
251    }
252    flattened = self.flatten(data, key_fn=lambda path: "prefix_" + path[0])
253    self.assertDictEqual(flattened, {
254        "prefix_a": 1,
255        "prefix_b": 2,
256    })
257
258  def test_single_key_fn_filtering(self):
259    data = {
260        "a": 1,
261        "b": 2,
262    }
263    flattened = self.flatten(
264        data,
265        key_fn=lambda path: None if path[0] == "a" else "prefix_" + path[0])
266    self.assertDictEqual(flattened, {
267        "prefix_b": 2,
268    })
269
270  def test_single_nested_key_fn(self):
271    data = {
272        "a": 1,
273        "b": {
274            "a": 2,
275            "b": 3
276        },
277    }
278    with self.assertRaises(ValueError):
279      # Fail on duplicate entries
280      self.flatten(data, key_fn=lambda path: "prefix_" + path[0])
281
282    flattened = self.flatten(
283        data, key_fn=lambda path: "prefix_" + "/".join(path))
284    self.assertDictEqual(flattened, {
285        "prefix_a": 1,
286        "prefix_b/a": 2,
287        "prefix_b/b": 3,
288    })
289
290  def test_single_nested_key_fn_filtering(self):
291    data = {
292        "a": 1,
293        "b": {
294            "a": 2,
295            "b": 3
296        },
297    }
298    flattened = self.flatten(
299        data,
300        key_fn=lambda path: None
301        if path[-1] == "a" else "prefix_" + "/".join(path))
302    self.assertDictEqual(flattened, {
303        "prefix_b/b": 3,
304    })
305
306  def test_multiple_flat(self):
307    data_1 = {
308        "a": 1,
309        "b": 2,
310    }
311    with self.assertRaises(ValueError):
312      # duplicate entries
313      self.flatten(data_1, data_1)
314    data_2 = {
315        "c": 3,
316        "d": 4,
317    }
318    flattened = self.flatten(data_1, data_2)
319    self.assertDictEqual(flattened, {
320        "a": 1,
321        "b": 2,
322        "c": 3,
323        "d": 4,
324    })
325
326  def test_null(self):
327    data = {
328        "a": 1,
329        "b": None,
330    }
331    flattened = self.flatten(data)
332    self.assertDictEqual(flattened, {
333        "a": 1,
334    })
335
336
337if __name__ == "__main__":
338  test_helper.run_pytest(__file__)
339