1# Copyright 2014 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import fnmatch 6import hashlib 7import logging 8import posixpath 9import signal 10try: 11 import _thread as thread 12except ImportError: 13 import thread 14import threading 15 16from devil import base_error 17from devil.android import crash_handler 18from devil.android import device_errors 19from devil.android.sdk import version_codes 20from devil.android.tools import device_recovery 21from devil.utils import signal_handler 22from pylib import valgrind_tools 23from pylib.base import base_test_result 24from pylib.base import test_run 25from pylib.base import test_collection 26from pylib.local.device import local_device_environment 27 28 29_SIGTERM_TEST_LOG = ( 30 ' Suite execution terminated, probably due to swarming timeout.\n' 31 ' Your test may not have run.') 32 33 34def SubstituteDeviceRoot(device_path, device_root): 35 if not device_path: 36 return device_root 37 if isinstance(device_path, list): 38 return posixpath.join(*(p if p else device_root for p in device_path)) 39 return device_path 40 41 42class TestsTerminated(Exception): 43 pass 44 45 46class InvalidShardingSettings(Exception): 47 def __init__(self, shard_index, total_shards): 48 super().__init__( 49 'Invalid sharding settings. shard_index: %d total_shards: %d' % 50 (shard_index, total_shards)) 51 52 53class LocalDeviceTestRun(test_run.TestRun): 54 55 def __init__(self, env, test_instance): 56 super().__init__(env, test_instance) 57 self._tools = {} 58 # This is intended to be filled by a child class. 59 self._installed_packages = [] 60 env.SetPreferredAbis(test_instance.GetPreferredAbis()) 61 62 #override 63 def RunTests(self, results, raw_logs_fh=None): 64 tests = self._GetTests() 65 66 exit_now = threading.Event() 67 68 @local_device_environment.handle_shard_failures 69 def run_tests_on_device(dev, tests, results): 70 # This is performed here instead of during setup because restarting the 71 # device clears app compatibility flags, which will happen if a device 72 # needs to be recovered. 73 SetAppCompatibilityFlagsIfNecessary(self._installed_packages, dev) 74 consecutive_device_errors = 0 75 for test in tests: 76 if not test: 77 logging.warning('No tests in shard. Continuing.') 78 tests.test_completed() 79 continue 80 if exit_now.isSet(): 81 thread.exit() 82 83 result = None 84 rerun = None 85 try: 86 result, rerun = crash_handler.RetryOnSystemCrash( 87 lambda d, t=test: self._RunTest(d, t), 88 device=dev) 89 consecutive_device_errors = 0 90 if isinstance(result, base_test_result.BaseTestResult): 91 results.AddResult(result) 92 elif isinstance(result, list): 93 results.AddResults(result) 94 else: 95 raise Exception( 96 'Unexpected result type: %s' % type(result).__name__) 97 except device_errors.CommandTimeoutError: 98 # Test timeouts don't count as device errors for the purpose 99 # of bad device detection. 100 consecutive_device_errors = 0 101 102 if isinstance(test, list): 103 results.AddResults( 104 base_test_result.BaseTestResult( 105 self._GetUniqueTestName(t), 106 base_test_result.ResultType.TIMEOUT) for t in test) 107 else: 108 results.AddResult( 109 base_test_result.BaseTestResult( 110 self._GetUniqueTestName(test), 111 base_test_result.ResultType.TIMEOUT)) 112 except device_errors.DeviceUnreachableError: 113 # If the device is no longer reachable then terminate this 114 # run_tests_on_device call. 115 raise 116 except base_error.BaseError: 117 # If we get a device error but believe the device is still 118 # reachable, attempt to continue using it. 119 if isinstance(tests, test_collection.TestCollection): 120 rerun = test 121 122 consecutive_device_errors += 1 123 if consecutive_device_errors >= 3: 124 # We believe the device is still reachable and may still be usable, 125 # but if it fails repeatedly, we shouldn't attempt to keep using 126 # it. 127 logging.error('Repeated failures on device %s. Abandoning.', 128 str(dev)) 129 raise 130 131 logging.exception( 132 'Attempting to continue using device %s despite failure (%d/3).', 133 str(dev), consecutive_device_errors) 134 135 finally: 136 if isinstance(tests, test_collection.TestCollection): 137 if rerun: 138 tests.add(rerun) 139 tests.test_completed() 140 141 logging.info('Finished running tests on this device.') 142 143 def stop_tests(_signum, _frame): 144 logging.critical('Received SIGTERM. Stopping test execution.') 145 exit_now.set() 146 raise TestsTerminated() 147 148 try: 149 with signal_handler.AddSignalHandler(signal.SIGTERM, stop_tests): 150 self._env.ResetCurrentTry() 151 while self._env.current_try < self._env.max_tries and tests: 152 tries = self._env.current_try 153 grouped_tests = self._GroupTests(tests) 154 logging.info('STARTING TRY #%d/%d', tries + 1, self._env.max_tries) 155 if tries > 0 and self._env.recover_devices: 156 if any(d.build_version_sdk == version_codes.LOLLIPOP_MR1 157 for d in self._env.devices): 158 logging.info( 159 'Attempting to recover devices due to known issue on L MR1. ' 160 'See crbug.com/787056 for details.') 161 self._env.parallel_devices.pMap( 162 device_recovery.RecoverDevice, None) 163 elif tries + 1 == self._env.max_tries: 164 logging.info( 165 'Attempting to recover devices prior to last test attempt.') 166 self._env.parallel_devices.pMap( 167 device_recovery.RecoverDevice, None) 168 logging.info('Will run %d tests on %d devices: %s', 169 len(tests), len(self._env.devices), 170 ', '.join(str(d) for d in self._env.devices)) 171 for t in tests: 172 logging.debug(' %s', t) 173 174 try_results = base_test_result.TestRunResults() 175 test_names = (self._GetUniqueTestName(t) for t in tests) 176 try_results.AddResults( 177 base_test_result.BaseTestResult( 178 t, base_test_result.ResultType.NOTRUN) 179 for t in test_names if not t.endswith('*')) 180 181 # As soon as we know the names of the tests, we populate |results|. 182 # The tests in try_results will have their results updated by 183 # try_results.AddResult() as they are run. 184 results.append(try_results) 185 186 try: 187 if self._ShouldShardTestsForDevices(): 188 tc = test_collection.TestCollection( 189 self._CreateShardsForDevices(grouped_tests)) 190 self._env.parallel_devices.pMap( 191 run_tests_on_device, tc, try_results).pGet(None) 192 else: 193 self._env.parallel_devices.pMap(run_tests_on_device, 194 grouped_tests, 195 try_results).pGet(None) 196 except TestsTerminated: 197 for unknown_result in try_results.GetUnknown(): 198 try_results.AddResult( 199 base_test_result.BaseTestResult( 200 unknown_result.GetName(), 201 base_test_result.ResultType.TIMEOUT, 202 log=_SIGTERM_TEST_LOG)) 203 raise 204 205 self._env.IncrementCurrentTry() 206 tests = self._GetTestsToRetry(tests, try_results) 207 208 logging.info('FINISHED TRY #%d/%d', tries + 1, self._env.max_tries) 209 if tests: 210 logging.info('%d failed tests remain.', len(tests)) 211 else: 212 logging.info('All tests completed.') 213 except TestsTerminated: 214 pass 215 216 def _GetTestsToRetry(self, tests, try_results): 217 218 def is_failure_result(test_result): 219 if isinstance(test_result, list): 220 return any(is_failure_result(r) for r in test_result) 221 return ( 222 test_result is None 223 or test_result.GetType() not in ( 224 base_test_result.ResultType.PASS, 225 base_test_result.ResultType.SKIP)) 226 227 all_test_results = {r.GetName(): r for r in try_results.GetAll()} 228 229 tests_and_names = ((t, self._GetUniqueTestName(t)) for t in tests) 230 231 tests_and_results = {} 232 for test, name in tests_and_names: 233 if name.endswith('*'): 234 tests_and_results[name] = (test, [ 235 r for n, r in all_test_results.items() if fnmatch.fnmatch(n, name) 236 ]) 237 else: 238 tests_and_results[name] = (test, all_test_results.get(name)) 239 240 failed_tests_and_results = ((test, result) 241 for test, result in tests_and_results.values() 242 if is_failure_result(result)) 243 244 return [t for t, r in failed_tests_and_results if self._ShouldRetry(t, r)] 245 246 def _ApplyExternalSharding(self, tests, shard_index, total_shards): 247 logging.info('Using external sharding settings. This is shard %d/%d', 248 shard_index, total_shards) 249 250 if total_shards < 0 or shard_index < 0 or total_shards <= shard_index: 251 raise InvalidShardingSettings(shard_index, total_shards) 252 253 sharded_tests = [] 254 255 # Sort tests by hash. 256 # TODO(crbug.com/1257820): Add sorting logic back to _PartitionTests. 257 tests = self._SortTests(tests) 258 259 # Group tests by tests that should run in the same test invocation - either 260 # unit tests or batched tests. 261 grouped_tests = self._GroupTests(tests) 262 263 # Partition grouped tests approximately evenly across shards. 264 partitioned_tests = self._PartitionTests(grouped_tests, total_shards, 265 float('inf')) 266 if len(partitioned_tests) <= shard_index: 267 return [] 268 for t in partitioned_tests[shard_index]: 269 if isinstance(t, list): 270 sharded_tests.extend(t) 271 else: 272 sharded_tests.append(t) 273 return sharded_tests 274 275 # Sort by hash so we don't put all tests in a slow suite in the same 276 # partition. 277 def _SortTests(self, tests): 278 return sorted(tests, 279 key=lambda t: hashlib.sha256( 280 self._GetUniqueTestName(t[0] if isinstance(t, list) else t 281 ).encode()).hexdigest()) 282 283 # Partition tests evenly into |num_desired_partitions| partitions where 284 # possible. However, many constraints make partitioning perfectly impossible. 285 # If the max_partition_size isn't large enough, extra partitions may be 286 # created (infinite max size should always return precisely the desired 287 # number of partitions). Even if the |max_partition_size| is technically large 288 # enough to hold all of the tests in |num_desired_partitions|, we attempt to 289 # keep test order relatively stable to minimize flakes, so when tests are 290 # grouped (eg. batched tests), we cannot perfectly fill all paritions as that 291 # would require breaking up groups. 292 def _PartitionTests(self, tests, num_desired_partitions, max_partition_size): 293 # pylint: disable=no-self-use 294 partitions = [] 295 296 297 num_not_yet_allocated = sum( 298 [len(test) - 1 for test in tests if self._CountTestsIndividually(test)]) 299 num_not_yet_allocated += len(tests) 300 301 # Fast linear partition approximation capped by max_partition_size. We 302 # cannot round-robin or otherwise re-order tests dynamically because we want 303 # test order to remain stable. 304 partition_size = min(num_not_yet_allocated // num_desired_partitions, 305 max_partition_size) 306 partitions.append([]) 307 last_partition_size = 0 308 for test in tests: 309 test_count = len(test) if self._CountTestsIndividually(test) else 1 310 # Make a new shard whenever we would overfill the previous one. However, 311 # if the size of the test group is larger than the max partition size on 312 # its own, just put the group in its own shard instead of splitting up the 313 # group. 314 if (last_partition_size + test_count > partition_size 315 and last_partition_size > 0): 316 num_desired_partitions -= 1 317 if num_desired_partitions <= 0: 318 # Too many tests for number of partitions, just fill all partitions 319 # beyond num_desired_partitions. 320 partition_size = max_partition_size 321 else: 322 # Re-balance remaining partitions. 323 partition_size = min(num_not_yet_allocated // num_desired_partitions, 324 max_partition_size) 325 partitions.append([]) 326 partitions[-1].append(test) 327 last_partition_size = test_count 328 else: 329 partitions[-1].append(test) 330 last_partition_size += test_count 331 332 num_not_yet_allocated -= test_count 333 334 if not partitions[-1]: 335 partitions.pop() 336 return partitions 337 338 def _CountTestsIndividually(self, test): 339 # pylint: disable=no-self-use 340 if not isinstance(test, list): 341 return False 342 annotations = test[0]['annotations'] 343 # UnitTests tests are really fast, so to balance shards better, count 344 # UnitTests Batches as single tests. 345 return ('Batch' not in annotations 346 or annotations['Batch']['value'] != 'UnitTests') 347 348 def GetTool(self, device): 349 if str(device) not in self._tools: 350 self._tools[str(device)] = valgrind_tools.CreateTool( 351 self._env.tool, device) 352 return self._tools[str(device)] 353 354 def _CreateShardsForDevices(self, tests): 355 raise NotImplementedError 356 357 def _GetUniqueTestName(self, test): 358 # pylint: disable=no-self-use 359 return test 360 361 def _ShouldRetry(self, test, result): 362 # pylint: disable=no-self-use,unused-argument 363 return True 364 365 #override 366 def GetTestsForListing(self): 367 ret = self._GetTests() 368 ret = FlattenTestList(ret) 369 ret.sort() 370 return ret 371 372 def _GetTests(self): 373 raise NotImplementedError 374 375 def _GroupTests(self, tests): 376 # pylint: disable=no-self-use 377 return tests 378 379 def _RunTest(self, device, test): 380 raise NotImplementedError 381 382 def _ShouldShardTestsForDevices(self): 383 raise NotImplementedError 384 385 386def FlattenTestList(values): 387 """Returns a list with all nested lists (shard groupings) expanded.""" 388 ret = [] 389 for v in values: 390 if isinstance(v, list): 391 ret += v 392 else: 393 ret.append(v) 394 return ret 395 396 397def SetAppCompatibilityFlagsIfNecessary(packages, device): 398 """Sets app compatibility flags on the given packages and device. 399 400 Args: 401 packages: A list of strings containing package names to apply flags to. 402 device: A DeviceUtils instance to apply the flags on. 403 """ 404 405 def set_flag_for_packages(flag, enable): 406 enable_str = 'enable' if enable else 'disable' 407 for p in packages: 408 cmd = ['am', 'compat', enable_str, flag, p] 409 device.RunShellCommand(cmd) 410 411 sdk_version = device.build_version_sdk 412 if sdk_version >= version_codes.R: 413 # These flags are necessary to use the legacy storage permissions on R+. 414 # See crbug.com/1173699 for more information. 415 set_flag_for_packages('DEFAULT_SCOPED_STORAGE', False) 416 set_flag_for_packages('FORCE_ENABLE_SCOPED_STORAGE', False) 417 418 419class NoTestsError(Exception): 420 """Error for when no tests are found.""" 421