1# Copyright 2014 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import fnmatch 6import hashlib 7import logging 8import posixpath 9import signal 10try: 11 import _thread as thread 12except ImportError: 13 import thread 14import threading 15 16from devil import base_error 17from devil.android import crash_handler 18from devil.android import device_errors 19from devil.android.sdk import version_codes 20from devil.android.tools import device_recovery 21from devil.utils import signal_handler 22from pylib.base import base_test_result 23from pylib.base import test_collection 24from pylib.base import test_exception 25from pylib.base import test_run 26from pylib.local.device import local_device_environment 27 28 29_SIGTERM_TEST_LOG = ( 30 ' Suite execution terminated, probably due to swarming timeout.\n' 31 ' Your test may not have run.') 32 33 34def SubstituteDeviceRoot(device_path, device_root): 35 if not device_path: 36 return device_root 37 if isinstance(device_path, list): 38 return posixpath.join(*(p if p else device_root for p in device_path)) 39 return device_path 40 41 42class TestsTerminated(Exception): 43 pass 44 45 46class LocalDeviceTestRun(test_run.TestRun): 47 48 def __init__(self, env, test_instance): 49 super().__init__(env, test_instance) 50 # This is intended to be filled by a child class. 51 self._installed_packages = [] 52 env.SetPreferredAbis(test_instance.GetPreferredAbis()) 53 54 #override 55 def RunTests(self, results, raw_logs_fh=None): 56 tests = self._GetTests() 57 58 exit_now = threading.Event() 59 60 @local_device_environment.handle_shard_failures 61 def run_tests_on_device(dev, tests, results): 62 # This is performed here instead of during setup because restarting the 63 # device clears app compatibility flags, which will happen if a device 64 # needs to be recovered. 65 SetAppCompatibilityFlagsIfNecessary(self._installed_packages, dev) 66 consecutive_device_errors = 0 67 for test in tests: 68 if not test: 69 logging.warning('No tests in shard. Continuing.') 70 tests.test_completed() 71 continue 72 if exit_now.isSet(): 73 thread.exit() 74 75 result = None 76 rerun = None 77 try: 78 result, rerun = crash_handler.RetryOnSystemCrash( 79 lambda d, t=test: self._RunTest(d, t), 80 device=dev) 81 consecutive_device_errors = 0 82 if isinstance(result, base_test_result.BaseTestResult): 83 results.AddResult(result) 84 elif isinstance(result, list): 85 results.AddResults(result) 86 else: 87 raise Exception( 88 'Unexpected result type: %s' % type(result).__name__) 89 except device_errors.CommandTimeoutError: 90 # Test timeouts don't count as device errors for the purpose 91 # of bad device detection. 92 consecutive_device_errors = 0 93 94 if isinstance(test, list): 95 result_log = '' 96 if len(test) > 1: 97 result_log = ('The test command timed out when running multiple ' 98 'tests including this test. It does not ' 99 'necessarily mean this specific test timed out.') 100 # Ensure instrumentation tests not batched at env level retries. 101 for t in test: 102 # |dict| type infers it's an instrumentation test. 103 if isinstance(t, dict) and t['annotations']: 104 t['annotations'].pop('Batch', None) 105 106 results.AddResults( 107 base_test_result.BaseTestResult( 108 self._GetUniqueTestName(t), 109 base_test_result.ResultType.TIMEOUT, 110 log=result_log) for t in test) 111 else: 112 results.AddResult( 113 base_test_result.BaseTestResult( 114 self._GetUniqueTestName(test), 115 base_test_result.ResultType.TIMEOUT)) 116 except device_errors.DeviceUnreachableError: 117 # If the device is no longer reachable then terminate this 118 # run_tests_on_device call. 119 raise 120 except base_error.BaseError: 121 # If we get a device error but believe the device is still 122 # reachable, attempt to continue using it. 123 if isinstance(tests, test_collection.TestCollection): 124 rerun = test 125 126 consecutive_device_errors += 1 127 if consecutive_device_errors >= 3: 128 # We believe the device is still reachable and may still be usable, 129 # but if it fails repeatedly, we shouldn't attempt to keep using 130 # it. 131 logging.error('Repeated failures on device %s. Abandoning.', 132 str(dev)) 133 raise 134 135 logging.exception( 136 'Attempting to continue using device %s despite failure (%d/3).', 137 str(dev), consecutive_device_errors) 138 139 finally: 140 if isinstance(tests, test_collection.TestCollection): 141 if rerun: 142 tests.add(rerun) 143 tests.test_completed() 144 145 logging.info('Finished running tests on this device.') 146 147 def stop_tests(_signum, _frame): 148 logging.critical('Received SIGTERM. Stopping test execution.') 149 exit_now.set() 150 raise TestsTerminated() 151 152 try: 153 with signal_handler.AddSignalHandler(signal.SIGTERM, stop_tests): 154 self._env.ResetCurrentTry() 155 while self._env.current_try < self._env.max_tries and tests: 156 tries = self._env.current_try 157 tests = self._SortTests(tests) 158 grouped_tests = self._GroupTestsAfterSharding(tests) 159 logging.info('STARTING TRY #%d/%d', tries + 1, self._env.max_tries) 160 if tries > 0 and self._env.recover_devices: 161 if any(d.build_version_sdk == version_codes.LOLLIPOP_MR1 162 for d in self._env.devices): 163 logging.info( 164 'Attempting to recover devices due to known issue on L MR1. ' 165 'See crbug.com/787056 for details.') 166 self._env.parallel_devices.pMap( 167 device_recovery.RecoverDevice, None) 168 elif tries + 1 == self._env.max_tries: 169 logging.info( 170 'Attempting to recover devices prior to last test attempt.') 171 self._env.parallel_devices.pMap( 172 device_recovery.RecoverDevice, None) 173 logging.info( 174 'Will run %d tests, grouped into %d groups, on %d devices: %s', 175 len(tests), len(grouped_tests), len(self._env.devices), 176 ', '.join(str(d) for d in self._env.devices)) 177 for t in tests: 178 logging.debug(' %s', t) 179 180 try_results = base_test_result.TestRunResults() 181 test_names = (self._GetUniqueTestName(t) for t in tests) 182 try_results.AddResults( 183 base_test_result.BaseTestResult( 184 t, base_test_result.ResultType.NOTRUN) 185 for t in test_names if not t.endswith('*')) 186 187 # As soon as we know the names of the tests, we populate |results|. 188 # The tests in try_results will have their results updated by 189 # try_results.AddResult() as they are run. 190 results.append(try_results) 191 192 try: 193 if self._ShouldShardTestsForDevices(): 194 tc = test_collection.TestCollection( 195 self._CreateShardsForDevices(grouped_tests)) 196 self._env.parallel_devices.pMap( 197 run_tests_on_device, tc, try_results).pGet(None) 198 else: 199 self._env.parallel_devices.pMap(run_tests_on_device, 200 grouped_tests, 201 try_results).pGet(None) 202 except TestsTerminated: 203 for unknown_result in try_results.GetUnknown(): 204 try_results.AddResult( 205 base_test_result.BaseTestResult( 206 unknown_result.GetName(), 207 base_test_result.ResultType.TIMEOUT, 208 log=_SIGTERM_TEST_LOG)) 209 raise 210 211 self._env.IncrementCurrentTry() 212 tests = self._GetTestsToRetry(tests, try_results) 213 214 logging.info('FINISHED TRY #%d/%d', tries + 1, self._env.max_tries) 215 if tests: 216 logging.info('%d failed tests remain.', len(tests)) 217 else: 218 logging.info('All tests completed.') 219 except TestsTerminated: 220 pass 221 222 def _GetTestsToRetry(self, tests, try_results): 223 224 def is_failure_result(test_result): 225 if isinstance(test_result, list): 226 return any(is_failure_result(r) for r in test_result) 227 return ( 228 test_result is None 229 or test_result.GetType() not in ( 230 base_test_result.ResultType.PASS, 231 base_test_result.ResultType.SKIP)) 232 233 all_test_results = {r.GetName(): r for r in try_results.GetAll()} 234 235 tests_and_names = ((t, self._GetUniqueTestName(t)) for t in tests) 236 237 tests_and_results = {} 238 for test, name in tests_and_names: 239 if name.endswith('*'): 240 tests_and_results[name] = (test, [ 241 r for n, r in all_test_results.items() if fnmatch.fnmatch(n, name) 242 ]) 243 else: 244 tests_and_results[name] = (test, all_test_results.get(name)) 245 246 failed_tests_and_results = ((test, result) 247 for test, result in tests_and_results.values() 248 if is_failure_result(result)) 249 250 failed_tests = [ 251 t for t, r in failed_tests_and_results if self._ShouldRetry(t, r) 252 ] 253 return self._AppendPreTestsForRetry(failed_tests, tests) 254 255 def _ApplyExternalSharding(self, tests, shard_index, total_shards): 256 logging.info('Using external sharding settings. This is shard %d/%d', 257 shard_index, total_shards) 258 259 if total_shards < 0 or shard_index < 0 or total_shards <= shard_index: 260 raise test_exception.InvalidShardingSettings(shard_index, total_shards) 261 262 sharded_tests = [] 263 264 # Sort tests by hash. 265 # TODO(crbug.com/40200835): Add sorting logic back to _PartitionTests. 266 tests = self._SortTests(tests) 267 268 # Group tests by tests that should run in the same test invocation - either 269 # unit tests or batched tests. 270 grouped_tests = self._GroupTests(tests) 271 272 # Partition grouped tests approximately evenly across shards. 273 partitioned_tests = self._PartitionTests(grouped_tests, total_shards, 274 float('inf')) 275 if len(partitioned_tests) <= shard_index: 276 return [] 277 for t in partitioned_tests[shard_index]: 278 if isinstance(t, list): 279 sharded_tests.extend(t) 280 else: 281 sharded_tests.append(t) 282 return sharded_tests 283 284 # Sort by hash so we don't put all tests in a slow suite in the same 285 # partition. 286 def _SortTests(self, tests): 287 return sorted(tests, 288 key=lambda t: hashlib.sha256( 289 self._GetUniqueTestName(t[0] if isinstance(t, list) else t 290 ).encode()).hexdigest()) 291 292 # Partition tests evenly into |num_desired_partitions| partitions where 293 # possible. However, many constraints make partitioning perfectly impossible. 294 # If the max_partition_size isn't large enough, extra partitions may be 295 # created (infinite max size should always return precisely the desired 296 # number of partitions). Even if the |max_partition_size| is technically large 297 # enough to hold all of the tests in |num_desired_partitions|, we attempt to 298 # keep test order relatively stable to minimize flakes, so when tests are 299 # grouped (eg. batched tests), we cannot perfectly fill all paritions as that 300 # would require breaking up groups. 301 def _PartitionTests(self, tests, num_desired_partitions, max_partition_size): 302 # pylint: disable=no-self-use 303 partitions = [] 304 305 306 num_not_yet_allocated = sum( 307 [len(test) - 1 for test in tests if self._CountTestsIndividually(test)]) 308 num_not_yet_allocated += len(tests) 309 310 # Fast linear partition approximation capped by max_partition_size. We 311 # cannot round-robin or otherwise re-order tests dynamically because we want 312 # test order to remain stable. 313 partition_size = min(num_not_yet_allocated // num_desired_partitions, 314 max_partition_size) 315 partitions.append([]) 316 last_partition_size = 0 317 for test in tests: 318 test_count = len(test) if self._CountTestsIndividually(test) else 1 319 # Make a new shard whenever we would overfill the previous one. However, 320 # if the size of the test group is larger than the max partition size on 321 # its own, just put the group in its own shard instead of splitting up the 322 # group. 323 # TODO(crbug.com/40200835): Add logic to support PRE_ test recognition but 324 # it may hurt performance in most scenarios. Currently all PRE_ tests are 325 # partitioned into the last shard. Unless the number of PRE_ tests are 326 # larger than the partition size, the PRE_ test may get assigned into a 327 # different shard and cause test failure. 328 if (last_partition_size + test_count > partition_size 329 and last_partition_size > 0): 330 num_desired_partitions -= 1 331 if num_desired_partitions <= 0: 332 # Too many tests for number of partitions, just fill all partitions 333 # beyond num_desired_partitions. 334 partition_size = max_partition_size 335 else: 336 # Re-balance remaining partitions. 337 partition_size = min(num_not_yet_allocated // num_desired_partitions, 338 max_partition_size) 339 partitions.append([]) 340 partitions[-1].append(test) 341 last_partition_size = test_count 342 else: 343 partitions[-1].append(test) 344 last_partition_size += test_count 345 346 num_not_yet_allocated -= test_count 347 348 if not partitions[-1]: 349 partitions.pop() 350 return partitions 351 352 def _CountTestsIndividually(self, test): 353 # pylint: disable=no-self-use 354 if not isinstance(test, list): 355 return False 356 annotations = test[0]['annotations'] 357 # UnitTests tests are really fast, so to balance shards better, count 358 # UnitTests Batches as single tests. 359 return ('Batch' not in annotations 360 or annotations['Batch']['value'] != 'UnitTests') 361 362 def _CreateShardsForDevices(self, tests): 363 raise NotImplementedError 364 365 def _GetUniqueTestName(self, test): 366 # pylint: disable=no-self-use 367 return test 368 369 def _ShouldRetry(self, test, result): 370 # pylint: disable=no-self-use,unused-argument 371 return True 372 373 #override 374 def GetTestsForListing(self): 375 ret = self._GetTests() 376 ret = FlattenTestList(ret) 377 ret.sort() 378 return ret 379 380 def _GetTests(self): 381 raise NotImplementedError 382 383 def _GroupTests(self, tests): 384 # pylint: disable=no-self-use 385 return tests 386 387 def _GroupTestsAfterSharding(self, tests): 388 # pylint: disable=no-self-use 389 return tests 390 391 def _AppendPreTestsForRetry(self, failed_tests, tests): 392 # pylint: disable=no-self-use,unused-argument 393 return failed_tests 394 395 def _RunTest(self, device, test): 396 raise NotImplementedError 397 398 def _ShouldShardTestsForDevices(self): 399 raise NotImplementedError 400 401 402def FlattenTestList(values): 403 """Returns a list with all nested lists (shard groupings) expanded.""" 404 ret = [] 405 for v in values: 406 if isinstance(v, list): 407 ret += v 408 else: 409 ret.append(v) 410 return ret 411 412 413def SetAppCompatibilityFlagsIfNecessary(packages, device): 414 """Sets app compatibility flags on the given packages and device. 415 416 Args: 417 packages: A list of strings containing package names to apply flags to. 418 device: A DeviceUtils instance to apply the flags on. 419 """ 420 421 def set_flag_for_packages(flag, enable): 422 enable_str = 'enable' if enable else 'disable' 423 for p in packages: 424 cmd = ['am', 'compat', enable_str, flag, p] 425 device.RunShellCommand(cmd) 426 427 sdk_version = device.build_version_sdk 428 if sdk_version >= version_codes.R: 429 # These flags are necessary to use the legacy storage permissions on R+. 430 # See crbug.com/1173699 for more information. 431 set_flag_for_packages('DEFAULT_SCOPED_STORAGE', False) 432 set_flag_for_packages('FORCE_ENABLE_SCOPED_STORAGE', False) 433 434 435class NoTestsError(Exception): 436 """Error for when no tests are found.""" 437