1# Copyright 2022 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import datetime as dt 6import enum 7import pathlib 8import unittest 9 10from crossbench import compat, helper, plt 11from tests import test_helper 12from tests.crossbench.base import CrossbenchFakeFsTestCase 13 14 15class WaitTestCase(unittest.TestCase): 16 17 def test_invalid_wait_ranges(self): 18 with self.assertRaises(AssertionError): 19 helper.WaitRange(min=-1) 20 with self.assertRaises(AssertionError): 21 helper.WaitRange(timeout=0) 22 with self.assertRaises(AssertionError): 23 helper.WaitRange(factor=0.2) 24 with self.assertRaises(AssertionError): 25 helper.WaitRange(delay=100) 26 27 def test_range(self): 28 durations = list( 29 helper.WaitRange(min=1, max=16, factor=2, max_iterations=5)) 30 self.assertListEqual(durations, [ 31 dt.timedelta(seconds=1), 32 dt.timedelta(seconds=2), 33 dt.timedelta(seconds=4), 34 dt.timedelta(seconds=8), 35 dt.timedelta(seconds=16) 36 ]) 37 38 def test_range_with_delay(self): 39 durations = list( 40 helper.WaitRange(min=1, max=16, factor=2, max_iterations=5, delay=5.5)) 41 self.assertListEqual(durations, [ 42 dt.timedelta(seconds=5.5), 43 dt.timedelta(seconds=1), 44 dt.timedelta(seconds=2), 45 dt.timedelta(seconds=4), 46 dt.timedelta(seconds=8), 47 dt.timedelta(seconds=16) 48 ]) 49 50 def test_range_extended(self): 51 durations = list( 52 helper.WaitRange(min=1, max=16, factor=2, max_iterations=5 + 4)) 53 self.assertListEqual( 54 durations, 55 [ 56 dt.timedelta(seconds=1), 57 dt.timedelta(seconds=2), 58 dt.timedelta(seconds=4), 59 dt.timedelta(seconds=8), 60 dt.timedelta(seconds=16), 61 # After 5 iterations the interval is no longer increased 62 dt.timedelta(seconds=16), 63 dt.timedelta(seconds=16), 64 dt.timedelta(seconds=16), 65 dt.timedelta(seconds=16) 66 ]) 67 68 def test_wait_with_backoff(self): 69 data = [] 70 delta = 0.0005 71 for time_spent, time_left in helper.WaitRange( 72 min=0.01, max=0.05).wait_with_backoff(): 73 data.append((time_spent, time_left)) 74 if len(data) == 2: 75 break 76 plt.PLATFORM.sleep(delta) 77 self.assertEqual(len(data), 2) 78 first_time_spent, first_time_left = data[0] 79 second_time_spent, second_time_left = data[1] 80 self.assertLessEqual(first_time_spent + delta, second_time_spent) 81 self.assertGreaterEqual(first_time_left, second_time_left + delta) 82 83 84class DurationsTestCase(unittest.TestCase): 85 86 def test_single(self): 87 durations = helper.Durations() 88 self.assertTrue(len(durations) == 0) 89 self.assertDictEqual(durations.to_json(), {}) 90 with durations.measure("a"): 91 pass 92 self.assertGreaterEqual(durations["a"].total_seconds(), 0) 93 self.assertTrue(len(durations) == 1) 94 95 def test_invalid_twice(self): 96 durations = helper.Durations() 97 with durations.measure("a"): 98 pass 99 with self.assertRaises(AssertionError): 100 with durations.measure("a"): 101 pass 102 self.assertTrue(len(durations) == 1) 103 self.assertListEqual(list(durations.to_json().keys()), ["a"]) 104 105 def test_multiple(self): 106 durations = helper.Durations() 107 for name in ["a", "b", "c"]: 108 with durations.measure(name): 109 pass 110 self.assertEqual(len(durations), 3) 111 self.assertListEqual(list(durations.to_json().keys()), ["a", "b", "c"]) 112 113 114class ChangeCWDTestCase(CrossbenchFakeFsTestCase): 115 116 def test_basic(self): 117 old_cwd = pathlib.Path.cwd() 118 new_cwd = pathlib.Path("/foo/bar").absolute() 119 new_cwd.mkdir(parents=True) 120 with helper.ChangeCWD(new_cwd): 121 self.assertNotEqual(old_cwd, pathlib.Path.cwd()) 122 self.assertEqual(new_cwd, pathlib.Path.cwd()) 123 self.assertEqual(old_cwd, pathlib.Path.cwd()) 124 self.assertNotEqual(new_cwd, pathlib.Path.cwd()) 125 126 127class FileSizeTestCase(CrossbenchFakeFsTestCase): 128 129 def test_empty(self): 130 test_file = pathlib.Path("test.txt") 131 test_file.touch() 132 size = helper.get_file_size(test_file) 133 self.assertEqual(size, "0.00 B") 134 135 def test_bytes(self): 136 test_file = pathlib.Path("test.txt") 137 self.fs.create_file(test_file, st_size=501) 138 size = helper.get_file_size(test_file) 139 self.assertEqual(size, "501.00 B") 140 141 def test_kib(self): 142 test_file = pathlib.Path("test.txt") 143 self.fs.create_file(test_file, st_size=1024 * 2) 144 size = helper.get_file_size(test_file) 145 self.assertEqual(size, "2.00 KiB") 146 147 def test_kib_fraction(self): 148 test_file = pathlib.Path("test.txt") 149 self.fs.create_file(test_file, st_size=int(1024 * 2.51)) 150 size = helper.get_file_size(test_file) 151 self.assertEqual(size, "2.51 KiB") 152 153 def test_sort_by_file_size(self): 154 small = pathlib.Path("smol") 155 medium = pathlib.Path("medium") 156 large = pathlib.Path("laaaarge") 157 self.fs.create_file(small, st_size=100) 158 self.fs.create_file(medium, st_size=200) 159 self.fs.create_file(large, st_size=300) 160 result = helper.sort_by_file_size([small, medium, large]) 161 self.assertListEqual(result, [small, medium, large]) 162 result = helper.sort_by_file_size([medium, large, small]) 163 self.assertListEqual(result, [small, medium, large]) 164 result = helper.sort_by_file_size([large, medium, small]) 165 self.assertListEqual(result, [small, medium, large]) 166 167 168class GroupByTestCase(unittest.TestCase): 169 170 def test_empty(self): 171 grouped = helper.group_by([], key=str) 172 self.assertDictEqual({}, grouped) 173 174 def test_basic(self): 175 grouped = helper.group_by([1, 1, 1, 2, 2, 3], key=str) 176 self.assertListEqual(list(grouped.keys()), ["1", "2", "3"]) 177 self.assertDictEqual({"1": [1, 1, 1], "2": [2, 2], "3": [3]}, grouped) 178 179 def test_basic_out_of_order(self): 180 grouped = helper.group_by([2, 3, 2, 1, 1, 1], key=str) 181 self.assertListEqual(list(grouped.keys()), ["1", "2", "3"]) 182 self.assertDictEqual({"1": [1, 1, 1], "2": [2, 2], "3": [3]}, grouped) 183 184 def test_basic_input_order(self): 185 grouped = helper.group_by([2, 3, 2, 1, 1, 1], key=str, sort_key=None) 186 self.assertListEqual(list(grouped.keys()), ["2", "3", "1"]) 187 self.assertDictEqual({"1": [1, 1, 1], "2": [2, 2], "3": [3]}, grouped) 188 189 def test_basic_custom_order(self): 190 grouped = helper.group_by([2, 3, 2, 1, 1, 1], 191 key=str, 192 sort_key=lambda item: int(item[0])) 193 self.assertListEqual(list(grouped.keys()), ["1", "2", "3"]) 194 self.assertDictEqual({"1": [1, 1, 1], "2": [2, 2], "3": [3]}, grouped) 195 # Try reverse sorting 196 grouped = helper.group_by([2, 3, 2, 1, 1, 1], 197 key=str, 198 sort_key=lambda item: -int(item[0])) 199 self.assertListEqual(list(grouped.keys()), ["3", "2", "1"]) 200 self.assertDictEqual({"1": [1, 1, 1], "2": [2, 2], "3": [3]}, grouped) 201 202 def test_custom_key(self): 203 grouped = helper.group_by([1.1, 1.2, 1.3, 2.1, 2.2, 3.1], key=int) 204 self.assertDictEqual({1: [1.1, 1.2, 1.3], 2: [2.1, 2.2], 3: [3.1]}, grouped) 205 206 def test_custom_value(self): 207 grouped = helper.group_by([1, 1, 1, 2, 2, 3], 208 key=str, 209 value=lambda x: x * 100) 210 self.assertDictEqual({ 211 "1": [100, 100, 100], 212 "2": [200, 200], 213 "3": [300] 214 }, grouped) 215 216 def test_custom_group(self): 217 grouped = helper.group_by([1, 1, 1, 2, 2, 3], 218 key=str, 219 group=lambda key: ["custom"]) 220 self.assertDictEqual( 221 { 222 "1": ["custom", 1, 1, 1], 223 "2": ["custom", 2, 2], 224 "3": ["custom", 3] 225 }, grouped) 226 227 def test_custom_group_out_of_order(self): 228 grouped = helper.group_by([1, 1, 1, 2, 2, 3], 229 key=str, 230 group=lambda key: ["custom"]) 231 self.assertDictEqual( 232 { 233 "1": ["custom", 1, 1, 1], 234 "2": ["custom", 2, 2], 235 "3": ["custom", 3] 236 }, grouped) 237 238 239class ConcatFilesTestCase(CrossbenchFakeFsTestCase): 240 241 def setUp(self): 242 super().setUp() 243 self.platform = plt.PLATFORM 244 245 def test_single(self): 246 input_file = pathlib.Path("input") 247 self.fs.create_file(input_file, contents="input content") 248 output = pathlib.Path("ouput") 249 self.platform.concat_files([input_file], output) 250 self.assertEqual(output.read_text(encoding="utf-8"), "input content") 251 252 def test_multiple(self): 253 input_a = pathlib.Path("input_1") 254 self.fs.create_file(input_a, contents="AAA") 255 input_b = pathlib.Path("input_2") 256 self.fs.create_file(input_b, contents="BBB") 257 output = pathlib.Path("ouput") 258 self.platform.concat_files([input_a, input_b], output) 259 self.assertEqual(output.read_text(encoding="utf-8"), "AAABBB") 260 261 262class StrEnumWithHelpTestCase(unittest.TestCase): 263 264 @enum.unique 265 class TestEnum(compat.StrEnumWithHelp): 266 A = ("a", "help a") 267 B = ("b", "help b") 268 269 def test_lookup(self): 270 self.assertIs(self.TestEnum("a"), self.TestEnum.A) # pytype: disable=wrong-arg-types 271 self.assertIs(self.TestEnum("b"), self.TestEnum.B) # pytype: disable=wrong-arg-types 272 self.assertIs(self.TestEnum["A"], self.TestEnum.A) 273 self.assertIs(self.TestEnum["B"], self.TestEnum.B) 274 275 def test_value_help(self): 276 # pylint: disable=no-member 277 self.assertEqual(self.TestEnum.A.name, "A") 278 self.assertEqual(self.TestEnum.B.name, "B") 279 self.assertEqual(self.TestEnum.A.value, "a") 280 self.assertEqual(self.TestEnum.B.value, "b") 281 self.assertEqual(self.TestEnum.A.help, "help a") 282 self.assertEqual(self.TestEnum.B.help, "help b") 283 284 def test_in(self): 285 self.assertIn(self.TestEnum.A, self.TestEnum) 286 self.assertIn(self.TestEnum.B, self.TestEnum) 287 288 def test_str(self): 289 self.assertEqual(str(self.TestEnum.A), "a") 290 self.assertEqual(str(self.TestEnum.B), "b") 291 292 def test_list(self): 293 self.assertEqual(len(self.TestEnum), 2) 294 self.assertListEqual( 295 list(self.TestEnum), [self.TestEnum.A, self.TestEnum.B]) 296 297 def test_help_items(self): 298 self.assertListEqual(self.TestEnum.help_text_items(), [("'a'", "help a"), 299 ("'b'", "help b")]) 300 301 302class UpdateUrlQueryTestCase(unittest.TestCase): 303 304 def test_empty(self): 305 self.assertEqual("http://test.com", 306 helper.update_url_query("http://test.com", {})) 307 self.assertEqual("https://test.com", 308 helper.update_url_query("https://test.com", {})) 309 self.assertEqual("https://test.com?foo=bar", 310 helper.update_url_query("https://test.com?foo=bar", {})) 311 312 def test_empty_add(self): 313 self.assertEqual("http://test.com?foo=bar", 314 helper.update_url_query("http://test.com", {"foo": "bar"})) 315 self.assertEqual( 316 "http://test.com?foo=bar#status", 317 helper.update_url_query("http://test.com#status", {"foo": "bar"})) 318 self.assertEqual( 319 "http://test.com?xyz=10&foo=bar#status", 320 helper.update_url_query("http://test.com?xyz=10#status", 321 {"foo": "bar"})) 322 323 def test_override(self): 324 self.assertEqual( 325 "http://test.com?foo=bar", 326 helper.update_url_query("http://test.com?foo=BAR", {"foo": "bar"})) 327 self.assertEqual( 328 "http://test.com?foo=bar#status", 329 helper.update_url_query("http://test.com?foo=BAR#status", 330 {"foo": "bar"})) 331 self.assertEqual( 332 "http://test.com?foo=bar&xyz=10#status", 333 helper.update_url_query("http://test.com?foo=BAR&xyz=10#status", 334 {"foo": "bar"})) 335 336 337if __name__ == "__main__": 338 test_helper.run_pytest(__file__) 339