#!/usr/bin/env python # Copyright (C) 2011 Google Inc. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above # copyright notice, this list of conditions and the following disclaimer # in the documentation and/or other materials provided with the # distribution. # * Neither the name of Google Inc. nor the names of its # contributors may be used to endorse or promote products derived from # this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. """ The TestRunner2 package is an alternate implementation of the TestRunner class that uses the manager_worker_broker module to send sets of tests to workers and receive their completion messages accordingly. """ import logging import time from webkitpy.tool import grammar from webkitpy.layout_tests.layout_package import manager_worker_broker from webkitpy.layout_tests.layout_package import test_runner from webkitpy.layout_tests.layout_package import worker _log = logging.getLogger(__name__) class _WorkerState(object): """A class for the TestRunner/manager to use to track the current state of the workers.""" def __init__(self, number, worker_connection): self.worker_connection = worker_connection self.number = number self.done = False self.current_test_name = None self.next_timeout = None self.wedged = False self.stats = {} self.stats['name'] = worker_connection.name self.stats['num_tests'] = 0 self.stats['total_time'] = 0 def __repr__(self): return "_WorkerState(" + str(self.__dict__) + ")" class TestRunner2(test_runner.TestRunner): def __init__(self, port, options, printer): test_runner.TestRunner.__init__(self, port, options, printer) self._all_results = [] self._group_stats = {} self._current_result_summary = None # This maps worker names to the state we are tracking for each of them. self._worker_states = {} def is_done(self): worker_states = self._worker_states.values() return worker_states and all(self._worker_is_done(worker_state) for worker_state in worker_states) def _worker_is_done(self, worker_state): t = time.time() if worker_state.done or worker_state.wedged: return True next_timeout = worker_state.next_timeout WEDGE_PADDING = 40.0 if next_timeout and t > next_timeout + WEDGE_PADDING: _log.error('') worker_state.worker_connection.log_wedged_worker(worker_state.current_test_name) _log.error('') worker_state.wedged = True return True return False def name(self): return 'TestRunner2' def _run_tests(self, file_list, result_summary): """Runs the tests in the file_list. Return: A tuple (interrupted, keyboard_interrupted, thread_timings, test_timings, individual_test_timings) interrupted is whether the run was interrupted keyboard_interrupted is whether someone typed Ctrl^C thread_timings is a list of dicts with the total runtime of each thread with 'name', 'num_tests', 'total_time' properties test_timings is a list of timings for each sharded subdirectory of the form [time, directory_name, num_tests] individual_test_timings is a list of run times for each test in the form {filename:filename, test_run_time:test_run_time} result_summary: summary object to populate with the results """ self._current_result_summary = result_summary self._all_results = [] self._group_stats = {} self._worker_states = {} keyboard_interrupted = False interrupted = False thread_timings = [] self._printer.print_update('Sharding tests ...') test_lists = self._shard_tests(file_list, (int(self._options.child_processes) > 1) and not self._options.experimental_fully_parallel) num_workers = self._num_workers(len(test_lists)) manager_connection = manager_worker_broker.get(self._port, self._options, self, worker.Worker) if self._options.dry_run: return (keyboard_interrupted, interrupted, thread_timings, self._group_stats, self._all_results) self._printer.print_update('Starting %s ...' % grammar.pluralize('worker', num_workers)) for worker_number in xrange(num_workers): worker_connection = manager_connection.start_worker(worker_number) worker_state = _WorkerState(worker_number, worker_connection) self._worker_states[worker_connection.name] = worker_state # FIXME: If we start workers up too quickly, DumpRenderTree appears # to thrash on something and time out its first few tests. Until # we can figure out what's going on, sleep a bit in between # workers. time.sleep(0.1) self._printer.print_update("Starting testing ...") for test_list in test_lists: manager_connection.post_message('test_list', test_list[0], test_list[1]) # We post one 'stop' message for each worker. Because the stop message # are sent after all of the tests, and because each worker will stop # reading messsages after receiving a stop, we can be sure each # worker will get a stop message and hence they will all shut down. for i in xrange(num_workers): manager_connection.post_message('stop') try: while not self.is_done(): # We loop with a timeout in order to be able to detect wedged threads. manager_connection.run_message_loop(delay_secs=1.0) if any(worker_state.wedged for worker_state in self._worker_states.values()): _log.error('') _log.error('Remaining workers are wedged, bailing out.') _log.error('') else: _log.debug('No wedged threads') # Make sure all of the workers have shut down (if possible). for worker_state in self._worker_states.values(): if not worker_state.wedged and worker_state.worker_connection.is_alive(): worker_state.worker_connection.join(0.5) assert not worker_state.worker_connection.is_alive() except KeyboardInterrupt: _log.info("Interrupted, exiting") self.cancel_workers() keyboard_interrupted = True except test_runner.TestRunInterruptedException, e: _log.info(e.reason) self.cancel_workers() interrupted = True except: # Unexpected exception; don't try to clean up workers. _log.info("Exception raised, exiting") raise thread_timings = [worker_state.stats for worker_state in self._worker_states.values()] # FIXME: should this be a class instead of a tuple? return (interrupted, keyboard_interrupted, thread_timings, self._group_stats, self._all_results) def cancel_workers(self): for worker_state in self._worker_states.values(): worker_state.worker_connection.cancel() def handle_started_test(self, source, test_info, hang_timeout): worker_state = self._worker_states[source] worker_state.current_test_name = self._port.relative_test_filename(test_info.filename) worker_state.next_timeout = time.time() + hang_timeout def handle_done(self, source): worker_state = self._worker_states[source] worker_state.done = True def handle_exception(self, source, exception_info): exception_type, exception_value, exception_traceback = exception_info raise exception_type, exception_value, exception_traceback def handle_finished_list(self, source, list_name, num_tests, elapsed_time): self._group_stats[list_name] = (num_tests, elapsed_time) def handle_finished_test(self, source, result, elapsed_time): worker_state = self._worker_states[source] worker_state.next_timeout = None worker_state.current_test_name = None worker_state.stats['total_time'] += elapsed_time worker_state.stats['num_tests'] += 1 if worker_state.wedged: # This shouldn't happen if we have our timeouts tuned properly. _log.error("%s unwedged", source) self._all_results.append(result) self._update_summary_with_result(self._current_result_summary, result)