1#!/usr/bin/env python 2# Copyright (C) 2011 Google Inc. All rights reserved. 3# 4# Redistribution and use in source and binary forms, with or without 5# modification, are permitted provided that the following conditions are 6# met: 7# 8# * Redistributions of source code must retain the above copyright 9# notice, this list of conditions and the following disclaimer. 10# * Redistributions in binary form must reproduce the above 11# copyright notice, this list of conditions and the following disclaimer 12# in the documentation and/or other materials provided with the 13# distribution. 14# * Neither the name of Google Inc. nor the names of its 15# contributors may be used to endorse or promote products derived from 16# this software without specific prior written permission. 17# 18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 30""" 31The TestRunner2 package is an alternate implementation of the TestRunner 32class that uses the manager_worker_broker module to send sets of tests to 33workers and receive their completion messages accordingly. 34""" 35 36import logging 37import time 38 39from webkitpy.tool import grammar 40 41from webkitpy.layout_tests.layout_package import manager_worker_broker 42from webkitpy.layout_tests.layout_package import test_runner 43from webkitpy.layout_tests.layout_package import worker 44 45 46_log = logging.getLogger(__name__) 47 48 49class _WorkerState(object): 50 """A class for the TestRunner/manager to use to track the current state 51 of the workers.""" 52 def __init__(self, number, worker_connection): 53 self.worker_connection = worker_connection 54 self.number = number 55 self.done = False 56 self.current_test_name = None 57 self.next_timeout = None 58 self.wedged = False 59 self.stats = {} 60 self.stats['name'] = worker_connection.name 61 self.stats['num_tests'] = 0 62 self.stats['total_time'] = 0 63 64 def __repr__(self): 65 return "_WorkerState(" + str(self.__dict__) + ")" 66 67 68class TestRunner2(test_runner.TestRunner): 69 def __init__(self, port, options, printer): 70 test_runner.TestRunner.__init__(self, port, options, printer) 71 self._all_results = [] 72 self._group_stats = {} 73 self._current_result_summary = None 74 75 # This maps worker names to the state we are tracking for each of them. 76 self._worker_states = {} 77 78 def is_done(self): 79 worker_states = self._worker_states.values() 80 return worker_states and all(self._worker_is_done(worker_state) for worker_state in worker_states) 81 82 def _worker_is_done(self, worker_state): 83 t = time.time() 84 if worker_state.done or worker_state.wedged: 85 return True 86 87 next_timeout = worker_state.next_timeout 88 WEDGE_PADDING = 40.0 89 if next_timeout and t > next_timeout + WEDGE_PADDING: 90 _log.error('') 91 worker_state.worker_connection.log_wedged_worker(worker_state.current_test_name) 92 _log.error('') 93 worker_state.wedged = True 94 return True 95 return False 96 97 def name(self): 98 return 'TestRunner2' 99 100 def _run_tests(self, file_list, result_summary): 101 """Runs the tests in the file_list. 102 103 Return: A tuple (interrupted, keyboard_interrupted, thread_timings, 104 test_timings, individual_test_timings) 105 interrupted is whether the run was interrupted 106 keyboard_interrupted is whether someone typed Ctrl^C 107 thread_timings is a list of dicts with the total runtime 108 of each thread with 'name', 'num_tests', 'total_time' properties 109 test_timings is a list of timings for each sharded subdirectory 110 of the form [time, directory_name, num_tests] 111 individual_test_timings is a list of run times for each test 112 in the form {filename:filename, test_run_time:test_run_time} 113 result_summary: summary object to populate with the results 114 """ 115 self._current_result_summary = result_summary 116 self._all_results = [] 117 self._group_stats = {} 118 self._worker_states = {} 119 120 keyboard_interrupted = False 121 interrupted = False 122 thread_timings = [] 123 124 self._printer.print_update('Sharding tests ...') 125 test_lists = self._shard_tests(file_list, 126 (int(self._options.child_processes) > 1) and not self._options.experimental_fully_parallel) 127 128 num_workers = self._num_workers(len(test_lists)) 129 130 manager_connection = manager_worker_broker.get(self._port, self._options, 131 self, worker.Worker) 132 133 if self._options.dry_run: 134 return (keyboard_interrupted, interrupted, thread_timings, 135 self._group_stats, self._all_results) 136 137 self._printer.print_update('Starting %s ...' % 138 grammar.pluralize('worker', num_workers)) 139 for worker_number in xrange(num_workers): 140 worker_connection = manager_connection.start_worker(worker_number) 141 worker_state = _WorkerState(worker_number, worker_connection) 142 self._worker_states[worker_connection.name] = worker_state 143 144 # FIXME: If we start workers up too quickly, DumpRenderTree appears 145 # to thrash on something and time out its first few tests. Until 146 # we can figure out what's going on, sleep a bit in between 147 # workers. 148 time.sleep(0.1) 149 150 self._printer.print_update("Starting testing ...") 151 for test_list in test_lists: 152 manager_connection.post_message('test_list', test_list[0], test_list[1]) 153 154 # We post one 'stop' message for each worker. Because the stop message 155 # are sent after all of the tests, and because each worker will stop 156 # reading messsages after receiving a stop, we can be sure each 157 # worker will get a stop message and hence they will all shut down. 158 for i in xrange(num_workers): 159 manager_connection.post_message('stop') 160 161 try: 162 while not self.is_done(): 163 # We loop with a timeout in order to be able to detect wedged threads. 164 manager_connection.run_message_loop(delay_secs=1.0) 165 166 if any(worker_state.wedged for worker_state in self._worker_states.values()): 167 _log.error('') 168 _log.error('Remaining workers are wedged, bailing out.') 169 _log.error('') 170 else: 171 _log.debug('No wedged threads') 172 173 # Make sure all of the workers have shut down (if possible). 174 for worker_state in self._worker_states.values(): 175 if not worker_state.wedged and worker_state.worker_connection.is_alive(): 176 worker_state.worker_connection.join(0.5) 177 assert not worker_state.worker_connection.is_alive() 178 179 except KeyboardInterrupt: 180 _log.info("Interrupted, exiting") 181 self.cancel_workers() 182 keyboard_interrupted = True 183 except test_runner.TestRunInterruptedException, e: 184 _log.info(e.reason) 185 self.cancel_workers() 186 interrupted = True 187 except: 188 # Unexpected exception; don't try to clean up workers. 189 _log.info("Exception raised, exiting") 190 raise 191 192 thread_timings = [worker_state.stats for worker_state in self._worker_states.values()] 193 194 # FIXME: should this be a class instead of a tuple? 195 return (interrupted, keyboard_interrupted, thread_timings, 196 self._group_stats, self._all_results) 197 198 def cancel_workers(self): 199 for worker_state in self._worker_states.values(): 200 worker_state.worker_connection.cancel() 201 202 def handle_started_test(self, source, test_info, hang_timeout): 203 worker_state = self._worker_states[source] 204 worker_state.current_test_name = self._port.relative_test_filename(test_info.filename) 205 worker_state.next_timeout = time.time() + hang_timeout 206 207 def handle_done(self, source): 208 worker_state = self._worker_states[source] 209 worker_state.done = True 210 211 def handle_exception(self, source, exception_info): 212 exception_type, exception_value, exception_traceback = exception_info 213 raise exception_type, exception_value, exception_traceback 214 215 def handle_finished_list(self, source, list_name, num_tests, elapsed_time): 216 self._group_stats[list_name] = (num_tests, elapsed_time) 217 218 def handle_finished_test(self, source, result, elapsed_time): 219 worker_state = self._worker_states[source] 220 worker_state.next_timeout = None 221 worker_state.current_test_name = None 222 worker_state.stats['total_time'] += elapsed_time 223 worker_state.stats['num_tests'] += 1 224 225 if worker_state.wedged: 226 # This shouldn't happen if we have our timeouts tuned properly. 227 _log.error("%s unwedged", source) 228 229 self._all_results.append(result) 230 self._update_summary_with_result(self._current_result_summary, result) 231