1# Copyright 2013 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Dispatches tests, either sharding or replicating them. 6 7Performs the following steps: 8* Create a test collection factory, using the given tests 9 - If sharding: test collection factory returns the same shared test collection 10 to all test runners 11 - If replciating: test collection factory returns a unique test collection to 12 each test runner, with the same set of tests in each. 13* Create a test runner for each device. 14* Run each test runner in its own thread, grabbing tests from the test 15 collection until there are no tests left. 16""" 17 18import logging 19import threading 20 21from pylib import android_commands 22from pylib import constants 23from pylib.base import base_test_result 24from pylib.device import device_errors 25from pylib.utils import reraiser_thread 26from pylib.utils import watchdog_timer 27 28 29DEFAULT_TIMEOUT = 7 * 60 # seven minutes 30 31 32class _ThreadSafeCounter(object): 33 """A threadsafe counter.""" 34 35 def __init__(self): 36 self._lock = threading.Lock() 37 self._value = 0 38 39 def GetAndIncrement(self): 40 """Get the current value and increment it atomically. 41 42 Returns: 43 The value before incrementing. 44 """ 45 with self._lock: 46 pre_increment = self._value 47 self._value += 1 48 return pre_increment 49 50 51class _Test(object): 52 """Holds a test with additional metadata.""" 53 54 def __init__(self, test, tries=0): 55 """Initializes the _Test object. 56 57 Args: 58 test: The test. 59 tries: Number of tries so far. 60 """ 61 self.test = test 62 self.tries = tries 63 64 65class _TestCollection(object): 66 """A threadsafe collection of tests. 67 68 Args: 69 tests: List of tests to put in the collection. 70 """ 71 72 def __init__(self, tests=None): 73 if not tests: 74 tests = [] 75 self._lock = threading.Lock() 76 self._tests = [] 77 self._tests_in_progress = 0 78 # Used to signal that an item is available or all items have been handled. 79 self._item_available_or_all_done = threading.Event() 80 for t in tests: 81 self.add(t) 82 83 def _pop(self): 84 """Pop a test from the collection. 85 86 Waits until a test is available or all tests have been handled. 87 88 Returns: 89 A test or None if all tests have been handled. 90 """ 91 while True: 92 # Wait for a test to be available or all tests to have been handled. 93 self._item_available_or_all_done.wait() 94 with self._lock: 95 # Check which of the two conditions triggered the signal. 96 if self._tests_in_progress == 0: 97 return None 98 try: 99 return self._tests.pop(0) 100 except IndexError: 101 # Another thread beat us to the available test, wait again. 102 self._item_available_or_all_done.clear() 103 104 def add(self, test): 105 """Add an test to the collection. 106 107 Args: 108 test: A test to add. 109 """ 110 with self._lock: 111 self._tests.append(test) 112 self._item_available_or_all_done.set() 113 self._tests_in_progress += 1 114 115 def test_completed(self): 116 """Indicate that a test has been fully handled.""" 117 with self._lock: 118 self._tests_in_progress -= 1 119 if self._tests_in_progress == 0: 120 # All tests have been handled, signal all waiting threads. 121 self._item_available_or_all_done.set() 122 123 def __iter__(self): 124 """Iterate through tests in the collection until all have been handled.""" 125 while True: 126 r = self._pop() 127 if r is None: 128 break 129 yield r 130 131 def __len__(self): 132 """Return the number of tests currently in the collection.""" 133 return len(self._tests) 134 135 def test_names(self): 136 """Return a list of the names of the tests currently in the collection.""" 137 with self._lock: 138 return list(t.test for t in self._tests) 139 140 141def _RunTestsFromQueue(runner, test_collection, out_results, watcher, 142 num_retries, tag_results_with_device=False): 143 """Runs tests from the test_collection until empty using the given runner. 144 145 Adds TestRunResults objects to the out_results list and may add tests to the 146 out_retry list. 147 148 Args: 149 runner: A TestRunner object used to run the tests. 150 test_collection: A _TestCollection from which to get _Test objects to run. 151 out_results: A list to add TestRunResults to. 152 watcher: A watchdog_timer.WatchdogTimer object, used as a shared timeout. 153 num_retries: Number of retries for a test. 154 tag_results_with_device: If True, appends the name of the device on which 155 the test was run to the test name. Used when replicating to identify 156 which device ran each copy of the test, and to ensure each copy of the 157 test is recorded separately. 158 """ 159 160 def TagTestRunResults(test_run_results): 161 """Tags all results with the last 4 digits of the device id. 162 163 Used when replicating tests to distinguish the same tests run on different 164 devices. We use a set to store test results, so the hash (generated from 165 name and tag) must be unique to be considered different results. 166 """ 167 new_test_run_results = base_test_result.TestRunResults() 168 for test_result in test_run_results.GetAll(): 169 test_result.SetName('%s_%s' % (runner.device_serial[-4:], 170 test_result.GetName())) 171 new_test_run_results.AddResult(test_result) 172 return new_test_run_results 173 174 for test in test_collection: 175 watcher.Reset() 176 try: 177 if runner.device_serial not in android_commands.GetAttachedDevices(): 178 # Device is unresponsive, stop handling tests on this device. 179 msg = 'Device %s is unresponsive.' % runner.device_serial 180 logging.warning(msg) 181 raise device_errors.DeviceUnreachableError(msg) 182 result, retry = runner.RunTest(test.test) 183 if tag_results_with_device: 184 result = TagTestRunResults(result) 185 test.tries += 1 186 if retry and test.tries <= num_retries: 187 # Retry non-passing results, only record passing results. 188 pass_results = base_test_result.TestRunResults() 189 pass_results.AddResults(result.GetPass()) 190 out_results.append(pass_results) 191 logging.warning('Will retry test, try #%s.' % test.tries) 192 test_collection.add(_Test(test=retry, tries=test.tries)) 193 else: 194 # All tests passed or retry limit reached. Either way, record results. 195 out_results.append(result) 196 except: 197 # An unhandleable exception, ensure tests get run by another device and 198 # reraise this exception on the main thread. 199 test_collection.add(test) 200 raise 201 finally: 202 # Retries count as separate tasks so always mark the popped test as done. 203 test_collection.test_completed() 204 205 206def _SetUp(runner_factory, device, out_runners, threadsafe_counter): 207 """Creates a test runner for each device and calls SetUp() in parallel. 208 209 Note: if a device is unresponsive the corresponding TestRunner will not be 210 added to out_runners. 211 212 Args: 213 runner_factory: Callable that takes a device and index and returns a 214 TestRunner object. 215 device: The device serial number to set up. 216 out_runners: List to add the successfully set up TestRunner object. 217 threadsafe_counter: A _ThreadSafeCounter object used to get shard indices. 218 """ 219 try: 220 index = threadsafe_counter.GetAndIncrement() 221 logging.warning('Creating shard %s for device %s.', index, device) 222 runner = runner_factory(device, index) 223 runner.SetUp() 224 out_runners.append(runner) 225 except (device_errors.DeviceUnreachableError, 226 # TODO(jbudorick) Remove this once the underlying implementations 227 # for the above are switched or wrapped. 228 android_commands.errors.DeviceUnresponsiveError) as e: 229 logging.warning('Failed to create shard for %s: [%s]', device, e) 230 231 232def _RunAllTests(runners, test_collection_factory, num_retries, timeout=None, 233 tag_results_with_device=False): 234 """Run all tests using the given TestRunners. 235 236 Args: 237 runners: A list of TestRunner objects. 238 test_collection_factory: A callable to generate a _TestCollection object for 239 each test runner. 240 num_retries: Number of retries for a test. 241 timeout: Watchdog timeout in seconds. 242 tag_results_with_device: If True, appends the name of the device on which 243 the test was run to the test name. Used when replicating to identify 244 which device ran each copy of the test, and to ensure each copy of the 245 test is recorded separately. 246 247 Returns: 248 A tuple of (TestRunResults object, exit code) 249 """ 250 logging.warning('Running tests with %s test runners.' % (len(runners))) 251 results = [] 252 exit_code = 0 253 run_results = base_test_result.TestRunResults() 254 watcher = watchdog_timer.WatchdogTimer(timeout) 255 test_collections = [test_collection_factory() for _ in runners] 256 257 threads = [ 258 reraiser_thread.ReraiserThread( 259 _RunTestsFromQueue, 260 [r, tc, results, watcher, num_retries, tag_results_with_device], 261 name=r.device_serial[-4:]) 262 for r, tc in zip(runners, test_collections)] 263 264 workers = reraiser_thread.ReraiserThreadGroup(threads) 265 workers.StartAll() 266 267 # Catch DeviceUnreachableErrors and set a warning exit code 268 try: 269 workers.JoinAll(watcher) 270 except (device_errors.DeviceUnreachableError, 271 # TODO(jbudorick) Remove this once the underlying implementations 272 # for the above are switched or wrapped. 273 android_commands.errors.DeviceUnresponsiveError) as e: 274 logging.error(e) 275 exit_code = constants.WARNING_EXIT_CODE 276 277 if not all((len(tc) == 0 for tc in test_collections)): 278 logging.error('Only ran %d tests (all devices are likely offline).' % 279 len(results)) 280 for tc in test_collections: 281 run_results.AddResults(base_test_result.BaseTestResult( 282 t, base_test_result.ResultType.UNKNOWN) for t in tc.test_names()) 283 284 for r in results: 285 run_results.AddTestRunResults(r) 286 if not run_results.DidRunPass(): 287 exit_code = constants.ERROR_EXIT_CODE 288 return (run_results, exit_code) 289 290 291def _CreateRunners(runner_factory, devices, timeout=None): 292 """Creates a test runner for each device and calls SetUp() in parallel. 293 294 Note: if a device is unresponsive the corresponding TestRunner will not be 295 included in the returned list. 296 297 Args: 298 runner_factory: Callable that takes a device and index and returns a 299 TestRunner object. 300 devices: List of device serial numbers as strings. 301 timeout: Watchdog timeout in seconds, defaults to the default timeout. 302 303 Returns: 304 A list of TestRunner objects. 305 """ 306 logging.warning('Creating %s test runners.' % len(devices)) 307 runners = [] 308 counter = _ThreadSafeCounter() 309 threads = reraiser_thread.ReraiserThreadGroup( 310 [reraiser_thread.ReraiserThread(_SetUp, 311 [runner_factory, d, runners, counter], 312 name=d[-4:]) 313 for d in devices]) 314 threads.StartAll() 315 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout)) 316 return runners 317 318 319def _TearDownRunners(runners, timeout=None): 320 """Calls TearDown() for each test runner in parallel. 321 322 Args: 323 runners: A list of TestRunner objects. 324 timeout: Watchdog timeout in seconds, defaults to the default timeout. 325 """ 326 threads = reraiser_thread.ReraiserThreadGroup( 327 [reraiser_thread.ReraiserThread(r.TearDown, name=r.device_serial[-4:]) 328 for r in runners]) 329 threads.StartAll() 330 threads.JoinAll(watchdog_timer.WatchdogTimer(timeout)) 331 332 333def ApplyMaxPerRun(tests, max_per_run): 334 """Rearrange the tests so that no group contains more than max_per_run tests. 335 336 Args: 337 tests: 338 max_per_run: 339 340 Returns: 341 A list of tests with no more than max_per_run per run. 342 """ 343 tests_expanded = [] 344 for test_group in tests: 345 if type(test_group) != str: 346 # Do not split test objects which are not strings. 347 tests_expanded.append(test_group) 348 else: 349 test_split = test_group.split(':') 350 for i in range(0, len(test_split), max_per_run): 351 tests_expanded.append(':'.join(test_split[i:i+max_per_run])) 352 return tests_expanded 353 354 355def RunTests(tests, runner_factory, devices, shard=True, 356 test_timeout=DEFAULT_TIMEOUT, setup_timeout=DEFAULT_TIMEOUT, 357 num_retries=2, max_per_run=256): 358 """Run all tests on attached devices, retrying tests that don't pass. 359 360 Args: 361 tests: List of tests to run. 362 runner_factory: Callable that takes a device and index and returns a 363 TestRunner object. 364 devices: List of attached devices. 365 shard: True if we should shard, False if we should replicate tests. 366 - Sharding tests will distribute tests across all test runners through a 367 shared test collection. 368 - Replicating tests will copy all tests to each test runner through a 369 unique test collection for each test runner. 370 test_timeout: Watchdog timeout in seconds for running tests. 371 setup_timeout: Watchdog timeout in seconds for creating and cleaning up 372 test runners. 373 num_retries: Number of retries for a test. 374 max_per_run: Maximum number of tests to run in any group. 375 376 Returns: 377 A tuple of (base_test_result.TestRunResults object, exit code). 378 """ 379 if not tests: 380 logging.critical('No tests to run.') 381 return (base_test_result.TestRunResults(), constants.ERROR_EXIT_CODE) 382 383 tests_expanded = ApplyMaxPerRun(tests, max_per_run) 384 if shard: 385 # Generate a shared _TestCollection object for all test runners, so they 386 # draw from a common pool of tests. 387 shared_test_collection = _TestCollection([_Test(t) for t in tests_expanded]) 388 test_collection_factory = lambda: shared_test_collection 389 tag_results_with_device = False 390 log_string = 'sharded across devices' 391 else: 392 # Generate a unique _TestCollection object for each test runner, but use 393 # the same set of tests. 394 test_collection_factory = lambda: _TestCollection( 395 [_Test(t) for t in tests_expanded]) 396 tag_results_with_device = True 397 log_string = 'replicated on each device' 398 399 logging.info('Will run %d tests (%s): %s', 400 len(tests_expanded), log_string, str(tests_expanded)) 401 runners = _CreateRunners(runner_factory, devices, setup_timeout) 402 try: 403 return _RunAllTests(runners, test_collection_factory, 404 num_retries, test_timeout, tag_results_with_device) 405 finally: 406 try: 407 _TearDownRunners(runners, setup_timeout) 408 except (device_errors.DeviceUnreachableError, 409 # TODO(jbudorick) Remove this once the underlying implementations 410 # for the above are switched or wrapped. 411 android_commands.errors.DeviceUnresponsiveError) as e: 412 logging.warning('Device unresponsive during TearDown: [%s]', e) 413 except Exception as e: 414 logging.error('Unexpected exception caught during TearDown: %s' % str(e)) 415