1# Copyright 2022 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import csv 6import pathlib 7import unittest 8from typing import List, Optional 9 10from crossbench.probes import helper 11from tests import test_helper 12from tests.crossbench.base import CrossbenchFakeFsTestCase 13 14 15class TestMergeCSV(CrossbenchFakeFsTestCase): 16 17 def merge(self, 18 *args, 19 delimiter: str = "\t", 20 headers: Optional[List[str]] = None, 21 row_header_len: int = 1): 22 csv_files = [] 23 for index, content in enumerate(args): 24 csv_file = pathlib.Path(f"test.{index}.csv") 25 with csv_file.open("w", newline="", encoding="utf-8") as f: 26 csv.writer(f, delimiter=delimiter).writerows(content) 27 csv_files.append(csv_file) 28 return helper.merge_csv( 29 csv_files, 30 delimiter=delimiter, 31 headers=headers, 32 row_header_len=row_header_len) 33 34 def test_merge_single(self): 35 data = [ 36 ["Metric", "Run1"], 37 ["Total", "200"], 38 ] 39 for delimiter in ["\t", ","]: 40 merged = self.merge(data, delimiter=delimiter) 41 self.assertListEqual(merged, data) 42 43 def test_merge_single_padding(self): 44 data = [ 45 ["Metric", "Run1", "Run2"], 46 ["marker"], 47 ["Total", "200", "300"], 48 ] 49 merged = self.merge(data, headers=None) 50 self.assertListEqual(merged, [ 51 ["Metric", "Run1", "Run2"], 52 ["marker", None, None], 53 ["Total", "200", "300"], 54 ]) 55 56 def test_merge_single_file_header(self): 57 data = [ 58 ["Total", "200"], 59 ] 60 for delimiter in ["\t", ","]: 61 merged = self.merge(data, delimiter=delimiter, headers=["custom"]) 62 self.assertListEqual(merged, [ 63 [None, "custom"], 64 ["Total", "200"], 65 ]) 66 67 def test_merge_two_padding(self): 68 data_1 = [ 69 ["marker"], 70 ["Total", "101", "102"], 71 ] 72 data_2 = [ 73 ["marker"], 74 ["Total", "201"], 75 ] 76 merged = self.merge(data_1, data_2, headers=["col_1", "col_2"]) 77 self.assertListEqual(merged, [ 78 [None, "col_1", None, "col_2"], 79 ["marker", None, None, None], 80 ["Total", "101", "102", "201"], 81 ]) 82 83 def test_merge_two_long_row_header(self): 84 data_1 = [ 85 ["full-marker", "marker"], 86 ["Full/Total", "Total", "101", "102"], 87 ] 88 data_2 = [ 89 ["full-marker", "marker"], 90 ["Full/Total", "Total", "201"], 91 ] 92 merged = self.merge( 93 data_1, data_2, headers=["col_1", "col_2"], row_header_len=2) 94 self.assertListEqual(merged, [ 95 [None, None, "col_1", None, "col_2"], 96 ["full-marker", "marker", None, None, None], 97 ["Full/Total", "Total", "101", "102", "201"], 98 ]) 99 100 def test_merge_two_disjoint_consecutive(self): 101 data_1 = [ 102 ["marker"], 103 ["A", "101", "102"], 104 ["B", "101", "102"], 105 ] 106 data_2 = [ 107 ["marker"], 108 ["C", "201"], 109 ["D", "201"], 110 ] 111 merged = self.merge(data_1, data_2) 112 self.assertListEqual(merged, [ 113 ["marker", None, None, None], 114 ["A", "101", "102", None], 115 ["B", "101", "102", None], 116 ["C", None, None, "201"], 117 ["D", None, None, "201"], 118 ]) 119 120 def test_merge_two_disjoint_interleaved(self): 121 data_1 = [ 122 ["marker"], 123 ["B", "101", "102"], 124 ["C", "201"], 125 ] 126 data_2 = [ 127 ["marker"], 128 ["A", "101", "102"], 129 ["D", "201"], 130 ] 131 merged = self.merge(data_1, data_2) 132 self.assertListEqual(merged, [ 133 ["marker", None, None, None, None], 134 ["A", None, None, "101", "102"], 135 ["B", "101", "102", None, None], 136 ["C", "201", None, None, None], 137 ["D", None, None, "201", None], 138 ]) 139 140 def test_merge_two_missing(self): 141 data_1 = [ 142 ["marker"], 143 ["Total-A0"], 144 ["Total-A1", "101"], 145 ["Total-A2", "111", "112"], 146 ["Total-A3", "301", "302"], 147 ["Total-B", "01"], 148 ["Total-X", "201", "202"], 149 ] 150 data_2 = [ 151 ["marker"], 152 ["Total-B", "02"], 153 ["Total-C1", "401", "402"], 154 ["Total-C2", "501"], 155 ["Total-C3", "601", "602"], 156 ["Total-C4", "701"], 157 ["Total-X", "203"], 158 ] 159 merged = self.merge(data_1, data_2, headers=["col_1", "col_2"]) 160 self.assertListEqual(merged, [ 161 [None, "col_1", None, "col_2", None], 162 ["marker", None, None, None, None], 163 ["Total-A0", None, None, None, None], 164 ["Total-A1", "101", None, None, None], 165 ["Total-A2", "111", "112", None, None], 166 ["Total-A3", "301", "302", None, None], 167 ["Total-B", "01", None, "02", None], 168 ["Total-C1", None, None, "401", "402"], 169 ["Total-C2", None, None, "501", None], 170 ["Total-C3", None, None, "601", "602"], 171 ["Total-C4", None, None, "701", None], 172 ["Total-X", "201", "202", "203", None], 173 ]) 174 175 def test_merge_two_duplicate(self): 176 data_1 = [ 177 ["A", "101"], 178 ["A", "201"], 179 ] 180 data_2 = [ 181 ["A", "301"], 182 ["A", "401"], 183 ] 184 merged = self.merge(data_1, data_2) 185 self.assertListEqual(merged, [ 186 ["A", "101", "301"], 187 ["A", "201", "401"], 188 ]) 189 190 def test_merge_two_partial_duplicate(self): 191 data_1 = [ 192 ["marker"], 193 ["A", "101"], 194 ["A", "201"], 195 ["B", "B01"], 196 ] 197 data_2 = [ 198 ["marker"], 199 ["A", "301"], 200 ["A", "401"], 201 ["C", "C01"], 202 ] 203 merged = self.merge(data_1, data_2) 204 self.assertListEqual(merged, [ 205 ["marker", None, None], 206 ["A", "101", "301"], 207 ["A", "201", "401"], 208 ["B", "B01", None], 209 ["C", None, "C01"], 210 ]) 211 212 213class TestFlatten(unittest.TestCase): 214 215 def flatten(self, *data, key_fn=None, sort: bool = True): 216 return helper.Flatten(*data, key_fn=key_fn, sort=sort).data 217 218 def test_single(self): 219 data = { 220 "a": 1, 221 "b": 2, 222 } 223 flattened = self.flatten(data) 224 self.assertDictEqual(flattened, data) 225 226 def test_single_sort(self): 227 data = { 228 "b": 2, 229 "a": 1, 230 } 231 flattened_keys = tuple(self.flatten(data, sort=True).keys()) 232 self.assertTupleEqual(flattened_keys, ("a", "b")) 233 flattened_keys = tuple(self.flatten(data, sort=False).keys()) 234 self.assertTupleEqual(flattened_keys, ("b", "a")) 235 236 def test_single_nested(self): 237 data = { 238 "a": 1, 239 "b": { 240 "a": 2, 241 "b": 3 242 }, 243 } 244 flattened = self.flatten(data) 245 self.assertDictEqual(flattened, {"a": 1, "b/a": 2, "b/b": 3}) 246 247 def test_single_key_fn(self): 248 data = { 249 "a": 1, 250 "b": 2, 251 } 252 flattened = self.flatten(data, key_fn=lambda path: "prefix_" + path[0]) 253 self.assertDictEqual(flattened, { 254 "prefix_a": 1, 255 "prefix_b": 2, 256 }) 257 258 def test_single_key_fn_filtering(self): 259 data = { 260 "a": 1, 261 "b": 2, 262 } 263 flattened = self.flatten( 264 data, 265 key_fn=lambda path: None if path[0] == "a" else "prefix_" + path[0]) 266 self.assertDictEqual(flattened, { 267 "prefix_b": 2, 268 }) 269 270 def test_single_nested_key_fn(self): 271 data = { 272 "a": 1, 273 "b": { 274 "a": 2, 275 "b": 3 276 }, 277 } 278 with self.assertRaises(ValueError): 279 # Fail on duplicate entries 280 self.flatten(data, key_fn=lambda path: "prefix_" + path[0]) 281 282 flattened = self.flatten( 283 data, key_fn=lambda path: "prefix_" + "/".join(path)) 284 self.assertDictEqual(flattened, { 285 "prefix_a": 1, 286 "prefix_b/a": 2, 287 "prefix_b/b": 3, 288 }) 289 290 def test_single_nested_key_fn_filtering(self): 291 data = { 292 "a": 1, 293 "b": { 294 "a": 2, 295 "b": 3 296 }, 297 } 298 flattened = self.flatten( 299 data, 300 key_fn=lambda path: None 301 if path[-1] == "a" else "prefix_" + "/".join(path)) 302 self.assertDictEqual(flattened, { 303 "prefix_b/b": 3, 304 }) 305 306 def test_multiple_flat(self): 307 data_1 = { 308 "a": 1, 309 "b": 2, 310 } 311 with self.assertRaises(ValueError): 312 # duplicate entries 313 self.flatten(data_1, data_1) 314 data_2 = { 315 "c": 3, 316 "d": 4, 317 } 318 flattened = self.flatten(data_1, data_2) 319 self.assertDictEqual(flattened, { 320 "a": 1, 321 "b": 2, 322 "c": 3, 323 "d": 4, 324 }) 325 326 def test_null(self): 327 data = { 328 "a": 1, 329 "b": None, 330 } 331 flattened = self.flatten(data) 332 self.assertDictEqual(flattened, { 333 "a": 1, 334 }) 335 336 337if __name__ == "__main__": 338 test_helper.run_pytest(__file__) 339