1# Copyright (C) 2011 Google Inc. All rights reserved. 2# 3# Redistribution and use in source and binary forms, with or without 4# modification, are permitted provided that the following conditions are 5# met: 6# 7# * Redistributions of source code must retain the above copyright 8# notice, this list of conditions and the following disclaimer. 9# * Redistributions in binary form must reproduce the above 10# copyright notice, this list of conditions and the following disclaimer 11# in the documentation and/or other materials provided with the 12# distribution. 13# * Neither the name of Google Inc. nor the names of its 14# contributors may be used to endorse or promote products derived from 15# this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 18# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 19# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 20# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 21# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 22# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 23# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 29import logging 30import math 31import threading 32import time 33 34from webkitpy.common import message_pool 35from webkitpy.layout_tests.controllers import single_test_runner 36from webkitpy.layout_tests.models.test_run_results import TestRunResults 37from webkitpy.layout_tests.models import test_expectations 38from webkitpy.layout_tests.models import test_failures 39from webkitpy.layout_tests.models import test_results 40from webkitpy.tool import grammar 41 42 43_log = logging.getLogger(__name__) 44 45 46TestExpectations = test_expectations.TestExpectations 47 48# Export this so callers don't need to know about message pools. 49WorkerException = message_pool.WorkerException 50 51 52class TestRunInterruptedException(Exception): 53 """Raised when a test run should be stopped immediately.""" 54 def __init__(self, reason): 55 Exception.__init__(self) 56 self.reason = reason 57 self.msg = reason 58 59 def __reduce__(self): 60 return self.__class__, (self.reason,) 61 62 63class LayoutTestRunner(object): 64 def __init__(self, options, port, printer, results_directory, test_is_slow_fn): 65 self._options = options 66 self._port = port 67 self._printer = printer 68 self._results_directory = results_directory 69 self._test_is_slow = test_is_slow_fn 70 self._sharder = Sharder(self._port.split_test, self._options.max_locked_shards) 71 self._filesystem = self._port.host.filesystem 72 73 self._expectations = None 74 self._test_inputs = [] 75 self._retrying = False 76 77 self._current_run_results = None 78 79 def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, retrying): 80 self._expectations = expectations 81 self._test_inputs = test_inputs 82 self._retrying = retrying 83 self._shards_to_redo = [] 84 85 # FIXME: rename all variables to test_run_results or some such ... 86 run_results = TestRunResults(self._expectations, len(test_inputs) + len(tests_to_skip)) 87 self._current_run_results = run_results 88 self._printer.num_tests = len(test_inputs) 89 self._printer.num_completed = 0 90 91 if not retrying: 92 self._printer.print_expected(run_results, self._expectations.get_tests_with_result_type) 93 94 for test_name in set(tests_to_skip): 95 result = test_results.TestResult(test_name) 96 result.type = test_expectations.SKIP 97 run_results.add(result, expected=True, test_is_slow=self._test_is_slow(test_name)) 98 99 self._printer.write_update('Sharding tests ...') 100 locked_shards, unlocked_shards = self._sharder.shard_tests(test_inputs, int(self._options.child_processes), self._options.fully_parallel) 101 102 # We don't have a good way to coordinate the workers so that they don't 103 # try to run the shards that need a lock. The easiest solution is to 104 # run all of the locked shards first. 105 all_shards = locked_shards + unlocked_shards 106 num_workers = min(num_workers, len(all_shards)) 107 self._printer.print_workers_and_shards(num_workers, len(all_shards), len(locked_shards)) 108 109 if self._options.dry_run: 110 return run_results 111 112 self._printer.write_update('Starting %s ...' % grammar.pluralize('worker', num_workers)) 113 114 start_time = time.time() 115 try: 116 with message_pool.get(self, self._worker_factory, num_workers, self._port.host) as pool: 117 pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards) 118 119 if self._shards_to_redo: 120 num_workers -= len(self._shards_to_redo) 121 if num_workers > 0: 122 with message_pool.get(self, self._worker_factory, num_workers, self._port.host) as pool: 123 pool.run(('test_list', shard.name, shard.test_inputs) for shard in self._shards_to_redo) 124 except TestRunInterruptedException, e: 125 _log.warning(e.reason) 126 run_results.interrupted = True 127 except KeyboardInterrupt: 128 self._printer.flush() 129 self._printer.writeln('Interrupted, exiting ...') 130 run_results.keyboard_interrupted = True 131 except Exception, e: 132 _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e))) 133 raise 134 finally: 135 run_results.run_time = time.time() - start_time 136 137 return run_results 138 139 def _worker_factory(self, worker_connection): 140 results_directory = self._results_directory 141 if self._retrying: 142 self._filesystem.maybe_make_directory(self._filesystem.join(self._results_directory, 'retries')) 143 results_directory = self._filesystem.join(self._results_directory, 'retries') 144 return Worker(worker_connection, results_directory, self._options) 145 146 def _mark_interrupted_tests_as_skipped(self, run_results): 147 for test_input in self._test_inputs: 148 if test_input.test_name not in run_results.results_by_name: 149 result = test_results.TestResult(test_input.test_name, [test_failures.FailureEarlyExit()]) 150 # FIXME: We probably need to loop here if there are multiple iterations. 151 # FIXME: Also, these results are really neither expected nor unexpected. We probably 152 # need a third type of result. 153 run_results.add(result, expected=False, test_is_slow=self._test_is_slow(test_input.test_name)) 154 155 def _interrupt_if_at_failure_limits(self, run_results): 156 # Note: The messages in this method are constructed to match old-run-webkit-tests 157 # so that existing buildbot grep rules work. 158 def interrupt_if_at_failure_limit(limit, failure_count, run_results, message): 159 if limit and failure_count >= limit: 160 message += " %d tests run." % (run_results.expected + run_results.unexpected) 161 self._mark_interrupted_tests_as_skipped(run_results) 162 raise TestRunInterruptedException(message) 163 164 interrupt_if_at_failure_limit( 165 self._options.exit_after_n_failures, 166 run_results.unexpected_failures, 167 run_results, 168 "Exiting early after %d failures." % run_results.unexpected_failures) 169 interrupt_if_at_failure_limit( 170 self._options.exit_after_n_crashes_or_timeouts, 171 run_results.unexpected_crashes + run_results.unexpected_timeouts, 172 run_results, 173 # This differs from ORWT because it does not include WebProcess crashes. 174 "Exiting early after %d crashes and %d timeouts." % (run_results.unexpected_crashes, run_results.unexpected_timeouts)) 175 176 def _update_summary_with_result(self, run_results, result): 177 expected = self._expectations.matches_an_expected_result(result.test_name, result.type, self._options.pixel_tests or result.reftest_type) 178 exp_str = self._expectations.get_expectations_string(result.test_name) 179 got_str = self._expectations.expectation_to_string(result.type) 180 181 if result.device_failed: 182 self._printer.print_finished_test(result, False, exp_str, "Aborted") 183 return 184 185 run_results.add(result, expected, self._test_is_slow(result.test_name)) 186 self._printer.print_finished_test(result, expected, exp_str, got_str) 187 self._interrupt_if_at_failure_limits(run_results) 188 189 def handle(self, name, source, *args): 190 method = getattr(self, '_handle_' + name) 191 if method: 192 return method(source, *args) 193 raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args))) 194 195 def _handle_started_test(self, worker_name, test_input, test_timeout_sec): 196 self._printer.print_started_test(test_input.test_name) 197 198 def _handle_finished_test_list(self, worker_name, list_name): 199 pass 200 201 def _handle_finished_test(self, worker_name, result, log_messages=[]): 202 self._update_summary_with_result(self._current_run_results, result) 203 204 def _handle_device_failed(self, worker_name, list_name, remaining_tests): 205 _log.warning("%s has failed" % worker_name) 206 if remaining_tests: 207 self._shards_to_redo.append(TestShard(list_name, remaining_tests)) 208 209class Worker(object): 210 def __init__(self, caller, results_directory, options): 211 self._caller = caller 212 self._worker_number = caller.worker_number 213 self._name = caller.name 214 self._results_directory = results_directory 215 self._options = options 216 217 # The remaining fields are initialized in start() 218 self._host = None 219 self._port = None 220 self._batch_size = None 221 self._batch_count = None 222 self._filesystem = None 223 self._driver = None 224 self._num_tests = 0 225 226 def __del__(self): 227 self.stop() 228 229 def start(self): 230 """This method is called when the object is starting to be used and it is safe 231 for the object to create state that does not need to be pickled (usually this means 232 it is called in a child process).""" 233 self._host = self._caller.host 234 self._filesystem = self._host.filesystem 235 self._port = self._host.port_factory.get(self._options.platform, self._options) 236 237 self._batch_count = 0 238 self._batch_size = self._options.batch_size or 0 239 240 def handle(self, name, source, test_list_name, test_inputs): 241 assert name == 'test_list' 242 for i, test_input in enumerate(test_inputs): 243 device_failed = self._run_test(test_input, test_list_name) 244 if device_failed: 245 self._caller.post('device_failed', test_list_name, test_inputs[i:]) 246 self._caller.stop_running() 247 return 248 249 self._caller.post('finished_test_list', test_list_name) 250 251 def _update_test_input(self, test_input): 252 if test_input.reference_files is None: 253 # Lazy initialization. 254 test_input.reference_files = self._port.reference_files(test_input.test_name) 255 if test_input.reference_files: 256 test_input.should_run_pixel_test = True 257 else: 258 test_input.should_run_pixel_test = self._port.should_run_as_pixel_test(test_input) 259 260 def _run_test(self, test_input, shard_name): 261 self._batch_count += 1 262 263 stop_when_done = False 264 if self._batch_size > 0 and self._batch_count >= self._batch_size: 265 self._batch_count = 0 266 stop_when_done = True 267 268 self._update_test_input(test_input) 269 test_timeout_sec = self._timeout(test_input) 270 start = time.time() 271 device_failed = False 272 273 if self._driver and self._driver.has_crashed(): 274 self._kill_driver() 275 if not self._driver: 276 self._driver = self._port.create_driver(self._worker_number) 277 278 if not self._driver: 279 # FIXME: Is this the best way to handle a device crashing in the middle of the test, or should we create 280 # a new failure type? 281 device_failed = True 282 return device_failed 283 284 self._caller.post('started_test', test_input, test_timeout_sec) 285 result = single_test_runner.run_single_test(self._port, self._options, self._results_directory, 286 self._name, self._driver, test_input, stop_when_done) 287 288 result.shard_name = shard_name 289 result.worker_name = self._name 290 result.total_run_time = time.time() - start 291 result.test_number = self._num_tests 292 self._num_tests += 1 293 self._caller.post('finished_test', result) 294 self._clean_up_after_test(test_input, result) 295 return result.device_failed 296 297 def stop(self): 298 _log.debug("%s cleaning up" % self._name) 299 self._kill_driver() 300 301 def _timeout(self, test_input): 302 """Compute the appropriate timeout value for a test.""" 303 # The driver watchdog uses 2.5x the timeout; we want to be 304 # larger than that. We also add a little more padding if we're 305 # running tests in a separate thread. 306 # 307 # Note that we need to convert the test timeout from a 308 # string value in milliseconds to a float for Python. 309 310 # FIXME: Can we just return the test_input.timeout now? 311 driver_timeout_sec = 3.0 * float(test_input.timeout) / 1000.0 312 313 def _kill_driver(self): 314 # Be careful about how and when we kill the driver; if driver.stop() 315 # raises an exception, this routine may get re-entered via __del__. 316 driver = self._driver 317 self._driver = None 318 if driver: 319 _log.debug("%s killing driver" % self._name) 320 driver.stop() 321 322 323 def _clean_up_after_test(self, test_input, result): 324 test_name = test_input.test_name 325 326 if result.failures: 327 # Check and kill the driver if we need to. 328 if any([f.driver_needs_restart() for f in result.failures]): 329 self._kill_driver() 330 # Reset the batch count since the shell just bounced. 331 self._batch_count = 0 332 333 # Print the error message(s). 334 _log.debug("%s %s failed:" % (self._name, test_name)) 335 for f in result.failures: 336 _log.debug("%s %s" % (self._name, f.message())) 337 elif result.type == test_expectations.SKIP: 338 _log.debug("%s %s skipped" % (self._name, test_name)) 339 else: 340 _log.debug("%s %s passed" % (self._name, test_name)) 341 342 343class TestShard(object): 344 """A test shard is a named list of TestInputs.""" 345 346 def __init__(self, name, test_inputs): 347 self.name = name 348 self.test_inputs = test_inputs 349 self.requires_lock = test_inputs[0].requires_lock 350 351 def __repr__(self): 352 return "TestShard(name='%s', test_inputs=%s, requires_lock=%s'" % (self.name, self.test_inputs, self.requires_lock) 353 354 def __eq__(self, other): 355 return self.name == other.name and self.test_inputs == other.test_inputs 356 357 358class Sharder(object): 359 def __init__(self, test_split_fn, max_locked_shards): 360 self._split = test_split_fn 361 self._max_locked_shards = max_locked_shards 362 363 def shard_tests(self, test_inputs, num_workers, fully_parallel): 364 """Groups tests into batches. 365 This helps ensure that tests that depend on each other (aka bad tests!) 366 continue to run together as most cross-tests dependencies tend to 367 occur within the same directory. 368 Return: 369 Two list of TestShards. The first contains tests that must only be 370 run under the server lock, the second can be run whenever. 371 """ 372 373 # FIXME: Move all of the sharding logic out of manager into its 374 # own class or module. Consider grouping it with the chunking logic 375 # in prepare_lists as well. 376 if num_workers == 1: 377 return self._shard_in_two(test_inputs) 378 elif fully_parallel: 379 return self._shard_every_file(test_inputs) 380 return self._shard_by_directory(test_inputs) 381 382 def _shard_in_two(self, test_inputs): 383 """Returns two lists of shards, one with all the tests requiring a lock and one with the rest. 384 385 This is used when there's only one worker, to minimize the per-shard overhead.""" 386 locked_inputs = [] 387 unlocked_inputs = [] 388 for test_input in test_inputs: 389 if test_input.requires_lock: 390 locked_inputs.append(test_input) 391 else: 392 unlocked_inputs.append(test_input) 393 394 locked_shards = [] 395 unlocked_shards = [] 396 if locked_inputs: 397 locked_shards = [TestShard('locked_tests', locked_inputs)] 398 if unlocked_inputs: 399 unlocked_shards = [TestShard('unlocked_tests', unlocked_inputs)] 400 401 return locked_shards, unlocked_shards 402 403 def _shard_every_file(self, test_inputs): 404 """Returns two lists of shards, each shard containing a single test file. 405 406 This mode gets maximal parallelism at the cost of much higher flakiness.""" 407 locked_shards = [] 408 unlocked_shards = [] 409 virtual_inputs = [] 410 411 for test_input in test_inputs: 412 # Note that we use a '.' for the shard name; the name doesn't really 413 # matter, and the only other meaningful value would be the filename, 414 # which would be really redundant. 415 if test_input.requires_lock: 416 locked_shards.append(TestShard('.', [test_input])) 417 elif test_input.test_name.startswith('virtual'): 418 # This violates the spirit of sharding every file, but in practice, since the 419 # virtual test suites require a different commandline flag and thus a restart 420 # of content_shell, it's too slow to shard them fully. 421 virtual_inputs.append(test_input) 422 else: 423 unlocked_shards.append(TestShard('.', [test_input])) 424 425 locked_virtual_shards, unlocked_virtual_shards = self._shard_by_directory(virtual_inputs) 426 427 # The locked shards still need to be limited to self._max_locked_shards in order to not 428 # overload the http server for the http tests. 429 return (self._resize_shards(locked_virtual_shards + locked_shards, self._max_locked_shards, 'locked_shard'), 430 unlocked_virtual_shards + unlocked_shards) 431 432 def _shard_by_directory(self, test_inputs): 433 """Returns two lists of shards, each shard containing all the files in a directory. 434 435 This is the default mode, and gets as much parallelism as we can while 436 minimizing flakiness caused by inter-test dependencies.""" 437 locked_shards = [] 438 unlocked_shards = [] 439 unlocked_slow_shards = [] 440 tests_by_dir = {} 441 # FIXME: Given that the tests are already sorted by directory, 442 # we can probably rewrite this to be clearer and faster. 443 for test_input in test_inputs: 444 directory = self._split(test_input.test_name)[0] 445 tests_by_dir.setdefault(directory, []) 446 tests_by_dir[directory].append(test_input) 447 448 for directory, test_inputs in tests_by_dir.iteritems(): 449 shard = TestShard(directory, test_inputs) 450 if test_inputs[0].requires_lock: 451 locked_shards.append(shard) 452 # In practice, virtual test suites are slow to run. It's a bit hacky, but 453 # put them first since they're the long-tail of test runtime. 454 elif directory.startswith('virtual'): 455 unlocked_slow_shards.append(shard) 456 else: 457 unlocked_shards.append(shard) 458 459 # Sort the shards by directory name. 460 locked_shards.sort(key=lambda shard: shard.name) 461 unlocked_slow_shards.sort(key=lambda shard: shard.name) 462 unlocked_shards.sort(key=lambda shard: shard.name) 463 464 # Put a ceiling on the number of locked shards, so that we 465 # don't hammer the servers too badly. 466 467 # FIXME: For now, limit to one shard or set it 468 # with the --max-locked-shards. After testing to make sure we 469 # can handle multiple shards, we should probably do something like 470 # limit this to no more than a quarter of all workers, e.g.: 471 # return max(math.ceil(num_workers / 4.0), 1) 472 return (self._resize_shards(locked_shards, self._max_locked_shards, 'locked_shard'), 473 unlocked_slow_shards + unlocked_shards) 474 475 def _resize_shards(self, old_shards, max_new_shards, shard_name_prefix): 476 """Takes a list of shards and redistributes the tests into no more 477 than |max_new_shards| new shards.""" 478 479 # This implementation assumes that each input shard only contains tests from a 480 # single directory, and that tests in each shard must remain together; as a 481 # result, a given input shard is never split between output shards. 482 # 483 # Each output shard contains the tests from one or more input shards and 484 # hence may contain tests from multiple directories. 485 486 def divide_and_round_up(numerator, divisor): 487 return int(math.ceil(float(numerator) / divisor)) 488 489 def extract_and_flatten(shards): 490 test_inputs = [] 491 for shard in shards: 492 test_inputs.extend(shard.test_inputs) 493 return test_inputs 494 495 def split_at(seq, index): 496 return (seq[:index], seq[index:]) 497 498 num_old_per_new = divide_and_round_up(len(old_shards), max_new_shards) 499 new_shards = [] 500 remaining_shards = old_shards 501 while remaining_shards: 502 some_shards, remaining_shards = split_at(remaining_shards, num_old_per_new) 503 new_shards.append(TestShard('%s_%d' % (shard_name_prefix, len(new_shards) + 1), extract_and_flatten(some_shards))) 504 return new_shards 505