1# Copyright (C) 2010 Google Inc. All rights reserved. 2# 3# Redistribution and use in source and binary forms, with or without 4# modification, are permitted provided that the following conditions are 5# met: 6# 7# * Redistributions of source code must retain the above copyright 8# notice, this list of conditions and the following disclaimer. 9# * Redistributions in binary form must reproduce the above 10# copyright notice, this list of conditions and the following disclaimer 11# in the documentation and/or other materials provided with the 12# distribution. 13# * Neither the name of Google Inc. nor the names of its 14# contributors may be used to endorse or promote products derived from 15# this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 18# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 19# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 20# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 21# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 22# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 23# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 29import optparse 30import Queue 31import sys 32import unittest 33 34try: 35 import multiprocessing 36except ImportError: 37 multiprocessing = None 38 39 40from webkitpy.common.system import outputcapture 41 42from webkitpy.layout_tests import port 43from webkitpy.layout_tests.layout_package import manager_worker_broker 44from webkitpy.layout_tests.layout_package import message_broker2 45from webkitpy.layout_tests.layout_package import printing 46 47# In order to reliably control when child workers are starting and stopping, 48# we use a pair of global variables to hold queues used for messaging. Ideally 49# we wouldn't need globals, but we can't pass these through a lexical closure 50# because those can't be Pickled and sent to a subprocess, and we'd prefer not 51# to have to pass extra arguments to the worker in the start_worker() call. 52starting_queue = None 53stopping_queue = None 54 55 56def make_broker(manager, worker_model, start_queue=None, stop_queue=None): 57 global starting_queue 58 global stopping_queue 59 starting_queue = start_queue 60 stopping_queue = stop_queue 61 options = get_options(worker_model) 62 return manager_worker_broker.get(port.get("test"), options, manager, _TestWorker) 63 64 65class _TestWorker(manager_worker_broker.AbstractWorker): 66 def __init__(self, broker_connection, worker_number, options): 67 self._broker_connection = broker_connection 68 self._options = options 69 self._worker_number = worker_number 70 self._name = 'TestWorker/%d' % worker_number 71 self._stopped = False 72 self._canceled = False 73 self._starting_queue = starting_queue 74 self._stopping_queue = stopping_queue 75 76 def handle_stop(self, src): 77 self._stopped = True 78 79 def handle_test(self, src, an_int, a_str): 80 assert an_int == 1 81 assert a_str == "hello, world" 82 self._broker_connection.post_message('test', 2, 'hi, everybody') 83 84 def is_done(self): 85 return self._stopped or self._canceled 86 87 def name(self): 88 return self._name 89 90 def cancel(self): 91 self._canceled = True 92 93 def run(self, port): 94 if self._starting_queue: 95 self._starting_queue.put('') 96 97 if self._stopping_queue: 98 self._stopping_queue.get() 99 try: 100 self._broker_connection.run_message_loop() 101 self._broker_connection.yield_to_broker() 102 self._broker_connection.post_message('done') 103 except Exception, e: 104 self._broker_connection.post_message('exception', (type(e), str(e), None)) 105 106 107def get_options(worker_model): 108 option_list = (manager_worker_broker.runtime_options() + 109 printing.print_options() + 110 [optparse.make_option("--experimental-fully-parallel", default=False), 111 optparse.make_option("--child-processes", default='2')]) 112 parser = optparse.OptionParser(option_list=option_list) 113 options, args = parser.parse_args(args=['--worker-model', worker_model]) 114 return options 115 116 117 118class FunctionTests(unittest.TestCase): 119 def test_get__inline(self): 120 self.assertTrue(make_broker(self, 'inline') is not None) 121 122 def test_get__threads(self): 123 self.assertTrue(make_broker(self, 'threads') is not None) 124 125 def test_get__processes(self): 126 # This test sometimes fails on Windows. See <http://webkit.org/b/55087>. 127 if sys.platform in ('cygwin', 'win32'): 128 return 129 130 if multiprocessing: 131 self.assertTrue(make_broker(self, 'processes') is not None) 132 else: 133 self.assertRaises(ValueError, make_broker, self, 'processes') 134 135 def test_get__unknown(self): 136 self.assertRaises(ValueError, make_broker, self, 'unknown') 137 138 139class _TestsMixin(object): 140 """Mixin class that implements a series of tests to enforce the 141 contract all implementations must follow.""" 142 143 def name(self): 144 return 'Tester' 145 146 def is_done(self): 147 return self._done 148 149 def handle_done(self, src): 150 self._done = True 151 152 def handle_test(self, src, an_int, a_str): 153 self._an_int = an_int 154 self._a_str = a_str 155 156 def handle_exception(self, src, exc_info): 157 self._exception = exc_info 158 self._done = True 159 160 def setUp(self): 161 self._an_int = None 162 self._a_str = None 163 self._broker = None 164 self._done = False 165 self._exception = None 166 self._worker_model = None 167 168 def make_broker(self, starting_queue=None, stopping_queue=None): 169 self._broker = make_broker(self, self._worker_model, starting_queue, 170 stopping_queue) 171 172 def test_cancel(self): 173 self.make_broker() 174 worker = self._broker.start_worker(0) 175 worker.cancel() 176 self._broker.post_message('test', 1, 'hello, world') 177 worker.join(0.5) 178 self.assertFalse(worker.is_alive()) 179 180 def test_done(self): 181 self.make_broker() 182 worker = self._broker.start_worker(0) 183 self._broker.post_message('test', 1, 'hello, world') 184 self._broker.post_message('stop') 185 self._broker.run_message_loop() 186 worker.join(0.5) 187 self.assertFalse(worker.is_alive()) 188 self.assertTrue(self.is_done()) 189 self.assertEqual(self._an_int, 2) 190 self.assertEqual(self._a_str, 'hi, everybody') 191 192 def test_log_wedged_worker(self): 193 starting_queue = self.queue() 194 stopping_queue = self.queue() 195 self.make_broker(starting_queue, stopping_queue) 196 oc = outputcapture.OutputCapture() 197 oc.capture_output() 198 try: 199 worker = self._broker.start_worker(0) 200 starting_queue.get() 201 worker.log_wedged_worker('test_name') 202 stopping_queue.put('') 203 self._broker.post_message('stop') 204 self._broker.run_message_loop() 205 worker.join(0.5) 206 self.assertFalse(worker.is_alive()) 207 self.assertTrue(self.is_done()) 208 finally: 209 oc.restore_output() 210 211 def test_unknown_message(self): 212 self.make_broker() 213 worker = self._broker.start_worker(0) 214 self._broker.post_message('unknown') 215 self._broker.run_message_loop() 216 worker.join(0.5) 217 218 self.assertTrue(self.is_done()) 219 self.assertFalse(worker.is_alive()) 220 self.assertEquals(self._exception[0], ValueError) 221 self.assertEquals(self._exception[1], 222 "TestWorker/0: received message 'unknown' it couldn't handle") 223 224 225class InlineBrokerTests(_TestsMixin, unittest.TestCase): 226 def setUp(self): 227 _TestsMixin.setUp(self) 228 self._worker_model = 'inline' 229 230 def test_log_wedged_worker(self): 231 self.make_broker() 232 worker = self._broker.start_worker(0) 233 self.assertRaises(AssertionError, worker.log_wedged_worker, None) 234 235 236# FIXME: https://bugs.webkit.org/show_bug.cgi?id=54520. 237if multiprocessing and sys.platform not in ('cygwin', 'win32'): 238 239 class MultiProcessBrokerTests(_TestsMixin, unittest.TestCase): 240 def setUp(self): 241 _TestsMixin.setUp(self) 242 self._worker_model = 'processes' 243 244 def queue(self): 245 return multiprocessing.Queue() 246 247 248class ThreadedBrokerTests(_TestsMixin, unittest.TestCase): 249 def setUp(self): 250 _TestsMixin.setUp(self) 251 self._worker_model = 'threads' 252 253 def queue(self): 254 return Queue.Queue() 255 256 257class FunctionsTest(unittest.TestCase): 258 def test_runtime_options(self): 259 option_list = manager_worker_broker.runtime_options() 260 parser = optparse.OptionParser(option_list=option_list) 261 options, args = parser.parse_args([]) 262 self.assertTrue(options) 263 264 265class InterfaceTest(unittest.TestCase): 266 # These tests mostly exist to pacify coverage. 267 268 # FIXME: There must be a better way to do this and also verify 269 # that classes do implement every abstract method in an interface. 270 def test_managerconnection_is_abstract(self): 271 # Test that all the base class methods are abstract and have the 272 # signature we expect. 273 broker = make_broker(self, 'inline') 274 obj = manager_worker_broker._ManagerConnection(broker._broker, None, self, None) 275 self.assertRaises(NotImplementedError, obj.start_worker, 0) 276 277 def test_workerconnection_is_abstract(self): 278 # Test that all the base class methods are abstract and have the 279 # signature we expect. 280 broker = make_broker(self, 'inline') 281 obj = manager_worker_broker._WorkerConnection(broker._broker, _TestWorker, 0, None) 282 self.assertRaises(NotImplementedError, obj.cancel) 283 self.assertRaises(NotImplementedError, obj.is_alive) 284 self.assertRaises(NotImplementedError, obj.join, None) 285 self.assertRaises(NotImplementedError, obj.log_wedged_worker, None) 286 287 288if __name__ == '__main__': 289 unittest.main() 290