• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (C) 2011 Google Inc. All rights reserved.
2#
3# Redistribution and use in source and binary forms, with or without
4# modification, are permitted provided that the following conditions are
5# met:
6#
7#     * Redistributions of source code must retain the above copyright
8# notice, this list of conditions and the following disclaimer.
9#     * Redistributions in binary form must reproduce the above
10# copyright notice, this list of conditions and the following disclaimer
11# in the documentation and/or other materials provided with the
12# distribution.
13#     * Neither the name of Google Inc. nor the names of its
14# contributors may be used to endorse or promote products derived from
15# this software without specific prior written permission.
16#
17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29import logging
30import math
31import threading
32import time
33
34from webkitpy.common import message_pool
35from webkitpy.layout_tests.controllers import single_test_runner
36from webkitpy.layout_tests.models.test_run_results import TestRunResults
37from webkitpy.layout_tests.models import test_expectations
38from webkitpy.layout_tests.models import test_failures
39from webkitpy.layout_tests.models import test_results
40from webkitpy.tool import grammar
41
42
43_log = logging.getLogger(__name__)
44
45
46TestExpectations = test_expectations.TestExpectations
47
48# Export this so callers don't need to know about message pools.
49WorkerException = message_pool.WorkerException
50
51
52class TestRunInterruptedException(Exception):
53    """Raised when a test run should be stopped immediately."""
54    def __init__(self, reason):
55        Exception.__init__(self)
56        self.reason = reason
57        self.msg = reason
58
59    def __reduce__(self):
60        return self.__class__, (self.reason,)
61
62
63class LayoutTestRunner(object):
64    def __init__(self, options, port, printer, results_directory, test_is_slow_fn):
65        self._options = options
66        self._port = port
67        self._printer = printer
68        self._results_directory = results_directory
69        self._test_is_slow = test_is_slow_fn
70        self._sharder = Sharder(self._port.split_test, self._options.max_locked_shards)
71        self._filesystem = self._port.host.filesystem
72
73        self._expectations = None
74        self._test_inputs = []
75        self._retrying = False
76
77        self._current_run_results = None
78
79    def run_tests(self, expectations, test_inputs, tests_to_skip, num_workers, retrying):
80        self._expectations = expectations
81        self._test_inputs = test_inputs
82        self._retrying = retrying
83        self._shards_to_redo = []
84
85        # FIXME: rename all variables to test_run_results or some such ...
86        run_results = TestRunResults(self._expectations, len(test_inputs) + len(tests_to_skip))
87        self._current_run_results = run_results
88        self._printer.num_tests = len(test_inputs)
89        self._printer.num_completed = 0
90
91        if not retrying:
92            self._printer.print_expected(run_results, self._expectations.get_tests_with_result_type)
93
94        for test_name in set(tests_to_skip):
95            result = test_results.TestResult(test_name)
96            result.type = test_expectations.SKIP
97            run_results.add(result, expected=True, test_is_slow=self._test_is_slow(test_name))
98
99        self._printer.write_update('Sharding tests ...')
100        locked_shards, unlocked_shards = self._sharder.shard_tests(test_inputs, int(self._options.child_processes), self._options.fully_parallel)
101
102        # We don't have a good way to coordinate the workers so that they don't
103        # try to run the shards that need a lock. The easiest solution is to
104        # run all of the locked shards first.
105        all_shards = locked_shards + unlocked_shards
106        num_workers = min(num_workers, len(all_shards))
107        self._printer.print_workers_and_shards(num_workers, len(all_shards), len(locked_shards))
108
109        if self._options.dry_run:
110            return run_results
111
112        self._printer.write_update('Starting %s ...' % grammar.pluralize('worker', num_workers))
113
114        start_time = time.time()
115        try:
116            with message_pool.get(self, self._worker_factory, num_workers, self._port.host) as pool:
117                pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards)
118
119            if self._shards_to_redo:
120                num_workers -= len(self._shards_to_redo)
121                if num_workers > 0:
122                    with message_pool.get(self, self._worker_factory, num_workers, self._port.host) as pool:
123                        pool.run(('test_list', shard.name, shard.test_inputs) for shard in self._shards_to_redo)
124        except TestRunInterruptedException, e:
125            _log.warning(e.reason)
126            run_results.interrupted = True
127        except KeyboardInterrupt:
128            self._printer.flush()
129            self._printer.writeln('Interrupted, exiting ...')
130            run_results.keyboard_interrupted = True
131        except Exception, e:
132            _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e)))
133            raise
134        finally:
135            run_results.run_time = time.time() - start_time
136
137        return run_results
138
139    def _worker_factory(self, worker_connection):
140        results_directory = self._results_directory
141        if self._retrying:
142            self._filesystem.maybe_make_directory(self._filesystem.join(self._results_directory, 'retries'))
143            results_directory = self._filesystem.join(self._results_directory, 'retries')
144        return Worker(worker_connection, results_directory, self._options)
145
146    def _mark_interrupted_tests_as_skipped(self, run_results):
147        for test_input in self._test_inputs:
148            if test_input.test_name not in run_results.results_by_name:
149                result = test_results.TestResult(test_input.test_name, [test_failures.FailureEarlyExit()])
150                # FIXME: We probably need to loop here if there are multiple iterations.
151                # FIXME: Also, these results are really neither expected nor unexpected. We probably
152                # need a third type of result.
153                run_results.add(result, expected=False, test_is_slow=self._test_is_slow(test_input.test_name))
154
155    def _interrupt_if_at_failure_limits(self, run_results):
156        # Note: The messages in this method are constructed to match old-run-webkit-tests
157        # so that existing buildbot grep rules work.
158        def interrupt_if_at_failure_limit(limit, failure_count, run_results, message):
159            if limit and failure_count >= limit:
160                message += " %d tests run." % (run_results.expected + run_results.unexpected)
161                self._mark_interrupted_tests_as_skipped(run_results)
162                raise TestRunInterruptedException(message)
163
164        interrupt_if_at_failure_limit(
165            self._options.exit_after_n_failures,
166            run_results.unexpected_failures,
167            run_results,
168            "Exiting early after %d failures." % run_results.unexpected_failures)
169        interrupt_if_at_failure_limit(
170            self._options.exit_after_n_crashes_or_timeouts,
171            run_results.unexpected_crashes + run_results.unexpected_timeouts,
172            run_results,
173            # This differs from ORWT because it does not include WebProcess crashes.
174            "Exiting early after %d crashes and %d timeouts." % (run_results.unexpected_crashes, run_results.unexpected_timeouts))
175
176    def _update_summary_with_result(self, run_results, result):
177        expected = self._expectations.matches_an_expected_result(result.test_name, result.type, self._options.pixel_tests or result.reftest_type)
178        exp_str = self._expectations.get_expectations_string(result.test_name)
179        got_str = self._expectations.expectation_to_string(result.type)
180
181        if result.device_failed:
182            self._printer.print_finished_test(result, False, exp_str, "Aborted")
183            return
184
185        run_results.add(result, expected, self._test_is_slow(result.test_name))
186        self._printer.print_finished_test(result, expected, exp_str, got_str)
187        self._interrupt_if_at_failure_limits(run_results)
188
189    def handle(self, name, source, *args):
190        method = getattr(self, '_handle_' + name)
191        if method:
192            return method(source, *args)
193        raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args)))
194
195    def _handle_started_test(self, worker_name, test_input, test_timeout_sec):
196        self._printer.print_started_test(test_input.test_name)
197
198    def _handle_finished_test_list(self, worker_name, list_name):
199        pass
200
201    def _handle_finished_test(self, worker_name, result, log_messages=[]):
202        self._update_summary_with_result(self._current_run_results, result)
203
204    def _handle_device_failed(self, worker_name, list_name, remaining_tests):
205        _log.warning("%s has failed" % worker_name)
206        if remaining_tests:
207            self._shards_to_redo.append(TestShard(list_name, remaining_tests))
208
209class Worker(object):
210    def __init__(self, caller, results_directory, options):
211        self._caller = caller
212        self._worker_number = caller.worker_number
213        self._name = caller.name
214        self._results_directory = results_directory
215        self._options = options
216
217        # The remaining fields are initialized in start()
218        self._host = None
219        self._port = None
220        self._batch_size = None
221        self._batch_count = None
222        self._filesystem = None
223        self._driver = None
224        self._num_tests = 0
225
226    def __del__(self):
227        self.stop()
228
229    def start(self):
230        """This method is called when the object is starting to be used and it is safe
231        for the object to create state that does not need to be pickled (usually this means
232        it is called in a child process)."""
233        self._host = self._caller.host
234        self._filesystem = self._host.filesystem
235        self._port = self._host.port_factory.get(self._options.platform, self._options)
236
237        self._batch_count = 0
238        self._batch_size = self._options.batch_size or 0
239
240    def handle(self, name, source, test_list_name, test_inputs):
241        assert name == 'test_list'
242        for i, test_input in enumerate(test_inputs):
243            device_failed = self._run_test(test_input, test_list_name)
244            if device_failed:
245                self._caller.post('device_failed', test_list_name, test_inputs[i:])
246                self._caller.stop_running()
247                return
248
249        self._caller.post('finished_test_list', test_list_name)
250
251    def _update_test_input(self, test_input):
252        if test_input.reference_files is None:
253            # Lazy initialization.
254            test_input.reference_files = self._port.reference_files(test_input.test_name)
255        if test_input.reference_files:
256            test_input.should_run_pixel_test = True
257        else:
258            test_input.should_run_pixel_test = self._port.should_run_as_pixel_test(test_input)
259
260    def _run_test(self, test_input, shard_name):
261        self._batch_count += 1
262
263        stop_when_done = False
264        if self._batch_size > 0 and self._batch_count >= self._batch_size:
265            self._batch_count = 0
266            stop_when_done = True
267
268        self._update_test_input(test_input)
269        test_timeout_sec = self._timeout(test_input)
270        start = time.time()
271        device_failed = False
272
273        if self._driver and self._driver.has_crashed():
274            self._kill_driver()
275        if not self._driver:
276            self._driver = self._port.create_driver(self._worker_number)
277
278        if not self._driver:
279            # FIXME: Is this the best way to handle a device crashing in the middle of the test, or should we create
280            # a new failure type?
281            device_failed = True
282            return device_failed
283
284        self._caller.post('started_test', test_input, test_timeout_sec)
285        result = single_test_runner.run_single_test(self._port, self._options, self._results_directory,
286            self._name, self._driver, test_input, stop_when_done)
287
288        result.shard_name = shard_name
289        result.worker_name = self._name
290        result.total_run_time = time.time() - start
291        result.test_number = self._num_tests
292        self._num_tests += 1
293        self._caller.post('finished_test', result)
294        self._clean_up_after_test(test_input, result)
295        return result.device_failed
296
297    def stop(self):
298        _log.debug("%s cleaning up" % self._name)
299        self._kill_driver()
300
301    def _timeout(self, test_input):
302        """Compute the appropriate timeout value for a test."""
303        # The driver watchdog uses 2.5x the timeout; we want to be
304        # larger than that. We also add a little more padding if we're
305        # running tests in a separate thread.
306        #
307        # Note that we need to convert the test timeout from a
308        # string value in milliseconds to a float for Python.
309
310        # FIXME: Can we just return the test_input.timeout now?
311        driver_timeout_sec = 3.0 * float(test_input.timeout) / 1000.0
312
313    def _kill_driver(self):
314        # Be careful about how and when we kill the driver; if driver.stop()
315        # raises an exception, this routine may get re-entered via __del__.
316        driver = self._driver
317        self._driver = None
318        if driver:
319            _log.debug("%s killing driver" % self._name)
320            driver.stop()
321
322
323    def _clean_up_after_test(self, test_input, result):
324        test_name = test_input.test_name
325
326        if result.failures:
327            # Check and kill the driver if we need to.
328            if any([f.driver_needs_restart() for f in result.failures]):
329                self._kill_driver()
330                # Reset the batch count since the shell just bounced.
331                self._batch_count = 0
332
333            # Print the error message(s).
334            _log.debug("%s %s failed:" % (self._name, test_name))
335            for f in result.failures:
336                _log.debug("%s  %s" % (self._name, f.message()))
337        elif result.type == test_expectations.SKIP:
338            _log.debug("%s %s skipped" % (self._name, test_name))
339        else:
340            _log.debug("%s %s passed" % (self._name, test_name))
341
342
343class TestShard(object):
344    """A test shard is a named list of TestInputs."""
345
346    def __init__(self, name, test_inputs):
347        self.name = name
348        self.test_inputs = test_inputs
349        self.requires_lock = test_inputs[0].requires_lock
350
351    def __repr__(self):
352        return "TestShard(name='%s', test_inputs=%s, requires_lock=%s'" % (self.name, self.test_inputs, self.requires_lock)
353
354    def __eq__(self, other):
355        return self.name == other.name and self.test_inputs == other.test_inputs
356
357
358class Sharder(object):
359    def __init__(self, test_split_fn, max_locked_shards):
360        self._split = test_split_fn
361        self._max_locked_shards = max_locked_shards
362
363    def shard_tests(self, test_inputs, num_workers, fully_parallel):
364        """Groups tests into batches.
365        This helps ensure that tests that depend on each other (aka bad tests!)
366        continue to run together as most cross-tests dependencies tend to
367        occur within the same directory.
368        Return:
369            Two list of TestShards. The first contains tests that must only be
370            run under the server lock, the second can be run whenever.
371        """
372
373        # FIXME: Move all of the sharding logic out of manager into its
374        # own class or module. Consider grouping it with the chunking logic
375        # in prepare_lists as well.
376        if num_workers == 1:
377            return self._shard_in_two(test_inputs)
378        elif fully_parallel:
379            return self._shard_every_file(test_inputs)
380        return self._shard_by_directory(test_inputs)
381
382    def _shard_in_two(self, test_inputs):
383        """Returns two lists of shards, one with all the tests requiring a lock and one with the rest.
384
385        This is used when there's only one worker, to minimize the per-shard overhead."""
386        locked_inputs = []
387        unlocked_inputs = []
388        for test_input in test_inputs:
389            if test_input.requires_lock:
390                locked_inputs.append(test_input)
391            else:
392                unlocked_inputs.append(test_input)
393
394        locked_shards = []
395        unlocked_shards = []
396        if locked_inputs:
397            locked_shards = [TestShard('locked_tests', locked_inputs)]
398        if unlocked_inputs:
399            unlocked_shards = [TestShard('unlocked_tests', unlocked_inputs)]
400
401        return locked_shards, unlocked_shards
402
403    def _shard_every_file(self, test_inputs):
404        """Returns two lists of shards, each shard containing a single test file.
405
406        This mode gets maximal parallelism at the cost of much higher flakiness."""
407        locked_shards = []
408        unlocked_shards = []
409        virtual_inputs = []
410
411        for test_input in test_inputs:
412            # Note that we use a '.' for the shard name; the name doesn't really
413            # matter, and the only other meaningful value would be the filename,
414            # which would be really redundant.
415            if test_input.requires_lock:
416                locked_shards.append(TestShard('.', [test_input]))
417            elif test_input.test_name.startswith('virtual'):
418                # This violates the spirit of sharding every file, but in practice, since the
419                # virtual test suites require a different commandline flag and thus a restart
420                # of content_shell, it's too slow to shard them fully.
421                virtual_inputs.append(test_input)
422            else:
423                unlocked_shards.append(TestShard('.', [test_input]))
424
425        locked_virtual_shards, unlocked_virtual_shards = self._shard_by_directory(virtual_inputs)
426
427        # The locked shards still need to be limited to self._max_locked_shards in order to not
428        # overload the http server for the http tests.
429        return (self._resize_shards(locked_virtual_shards + locked_shards, self._max_locked_shards, 'locked_shard'),
430            unlocked_virtual_shards + unlocked_shards)
431
432    def _shard_by_directory(self, test_inputs):
433        """Returns two lists of shards, each shard containing all the files in a directory.
434
435        This is the default mode, and gets as much parallelism as we can while
436        minimizing flakiness caused by inter-test dependencies."""
437        locked_shards = []
438        unlocked_shards = []
439        unlocked_slow_shards = []
440        tests_by_dir = {}
441        # FIXME: Given that the tests are already sorted by directory,
442        # we can probably rewrite this to be clearer and faster.
443        for test_input in test_inputs:
444            directory = self._split(test_input.test_name)[0]
445            tests_by_dir.setdefault(directory, [])
446            tests_by_dir[directory].append(test_input)
447
448        for directory, test_inputs in tests_by_dir.iteritems():
449            shard = TestShard(directory, test_inputs)
450            if test_inputs[0].requires_lock:
451                locked_shards.append(shard)
452            # In practice, virtual test suites are slow to run. It's a bit hacky, but
453            # put them first since they're the long-tail of test runtime.
454            elif directory.startswith('virtual'):
455                unlocked_slow_shards.append(shard)
456            else:
457                unlocked_shards.append(shard)
458
459        # Sort the shards by directory name.
460        locked_shards.sort(key=lambda shard: shard.name)
461        unlocked_slow_shards.sort(key=lambda shard: shard.name)
462        unlocked_shards.sort(key=lambda shard: shard.name)
463
464        # Put a ceiling on the number of locked shards, so that we
465        # don't hammer the servers too badly.
466
467        # FIXME: For now, limit to one shard or set it
468        # with the --max-locked-shards. After testing to make sure we
469        # can handle multiple shards, we should probably do something like
470        # limit this to no more than a quarter of all workers, e.g.:
471        # return max(math.ceil(num_workers / 4.0), 1)
472        return (self._resize_shards(locked_shards, self._max_locked_shards, 'locked_shard'),
473                unlocked_slow_shards + unlocked_shards)
474
475    def _resize_shards(self, old_shards, max_new_shards, shard_name_prefix):
476        """Takes a list of shards and redistributes the tests into no more
477        than |max_new_shards| new shards."""
478
479        # This implementation assumes that each input shard only contains tests from a
480        # single directory, and that tests in each shard must remain together; as a
481        # result, a given input shard is never split between output shards.
482        #
483        # Each output shard contains the tests from one or more input shards and
484        # hence may contain tests from multiple directories.
485
486        def divide_and_round_up(numerator, divisor):
487            return int(math.ceil(float(numerator) / divisor))
488
489        def extract_and_flatten(shards):
490            test_inputs = []
491            for shard in shards:
492                test_inputs.extend(shard.test_inputs)
493            return test_inputs
494
495        def split_at(seq, index):
496            return (seq[:index], seq[index:])
497
498        num_old_per_new = divide_and_round_up(len(old_shards), max_new_shards)
499        new_shards = []
500        remaining_shards = old_shards
501        while remaining_shards:
502            some_shards, remaining_shards = split_at(remaining_shards, num_old_per_new)
503            new_shards.append(TestShard('%s_%d' % (shard_name_prefix, len(new_shards) + 1), extract_and_flatten(some_shards)))
504        return new_shards
505