1# Copyright (C) 2011 Google Inc. All rights reserved. 2# 3# Redistribution and use in source and binary forms, with or without 4# modification, are permitted provided that the following conditions are 5# met: 6# 7# * Redistributions of source code must retain the above copyright 8# notice, this list of conditions and the following disclaimer. 9# * Redistributions in binary form must reproduce the above 10# copyright notice, this list of conditions and the following disclaimer 11# in the documentation and/or other materials provided with the 12# distribution. 13# * Neither the name of Google Inc. nor the names of its 14# contributors may be used to endorse or promote products derived from 15# this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 18# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 19# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 20# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 21# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 22# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 23# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 29"""Module for handling messages and concurrency for run-webkit-tests. 30 31This module implements a message broker that connects the manager 32(TestRunner2) to the workers: it provides a messaging abstraction and 33message loops (building on top of message_broker2), and handles starting 34workers by launching threads and/or processes depending on the 35requested configuration. 36 37There are a lot of classes and objects involved in a fully connected system. 38They interact more or less like: 39 40TestRunner2 --> _InlineManager ---> _InlineWorker <-> Worker 41 ^ \ / ^ 42 | v v | 43 \-------------------- MessageBroker -------------/ 44""" 45 46import logging 47import optparse 48import printing 49import Queue 50import sys 51import thread 52import threading 53import time 54 55 56# Handle Python < 2.6 where multiprocessing isn't available. 57try: 58 import multiprocessing 59except ImportError: 60 multiprocessing = None 61 62 63from webkitpy.common.system import stack_utils 64from webkitpy.layout_tests import port 65from webkitpy.layout_tests.layout_package import message_broker2 66 67 68_log = logging.getLogger(__name__) 69 70# 71# Topic names for Manager <-> Worker messaging 72# 73MANAGER_TOPIC = 'managers' 74ANY_WORKER_TOPIC = 'workers' 75 76 77def runtime_options(): 78 """Return a list of optparse.Option objects for any runtime values used 79 by this module.""" 80 options = [ 81 optparse.make_option("--worker-model", action="store", 82 help=("controls worker model. Valid values are " 83 "'inline', 'threads', and 'processes'.")), 84 ] 85 return options 86 87 88def get(port, options, client, worker_class): 89 """Return a connection to a manager/worker message_broker 90 91 Args: 92 port - handle to layout_tests/port object for port-specific stuff 93 options - optparse argument for command-line options 94 client - message_broker2.BrokerClient implementation to dispatch 95 replies to. 96 worker_class - type of workers to create. This class must implement 97 the methods in AbstractWorker. 98 Returns: 99 A handle to an object that will talk to a message broker configured 100 for the normal manager/worker communication. 101 """ 102 worker_model = options.worker_model 103 if worker_model == 'inline': 104 queue_class = Queue.Queue 105 manager_class = _InlineManager 106 elif worker_model == 'threads': 107 queue_class = Queue.Queue 108 manager_class = _ThreadedManager 109 elif worker_model == 'processes' and multiprocessing: 110 queue_class = multiprocessing.Queue 111 manager_class = _MultiProcessManager 112 else: 113 raise ValueError("unsupported value for --worker-model: %s" % 114 worker_model) 115 116 broker = message_broker2.Broker(options, queue_class) 117 return manager_class(broker, port, options, client, worker_class) 118 119 120class AbstractWorker(message_broker2.BrokerClient): 121 def __init__(self, broker_connection, worker_number, options): 122 """The constructor should be used to do any simple initialization 123 necessary, but should not do anything that creates data structures 124 that cannot be Pickled or sent across processes (like opening 125 files or sockets). Complex initialization should be done at the 126 start of the run() call. 127 128 Args: 129 broker_connection - handle to the BrokerConnection object creating 130 the worker and that can be used for messaging. 131 worker_number - identifier for this particular worker 132 options - command-line argument object from optparse""" 133 134 raise NotImplementedError 135 136 def run(self, port): 137 """Callback for the worker to start executing. Typically does any 138 remaining initialization and then calls broker_connection.run_message_loop().""" 139 raise NotImplementedError 140 141 def cancel(self): 142 """Called when possible to indicate to the worker to stop processing 143 messages and shut down. Note that workers may be stopped without this 144 method being called, so clients should not rely solely on this.""" 145 raise NotImplementedError 146 147 148class _ManagerConnection(message_broker2.BrokerConnection): 149 def __init__(self, broker, options, client, worker_class): 150 """Base initialization for all Manager objects. 151 152 Args: 153 broker: handle to the message_broker2 object 154 options: command line options object 155 client: callback object (the caller) 156 worker_class: class object to use to create workers. 157 """ 158 message_broker2.BrokerConnection.__init__(self, broker, client, 159 MANAGER_TOPIC, ANY_WORKER_TOPIC) 160 self._options = options 161 self._worker_class = worker_class 162 163 def start_worker(self, worker_number): 164 raise NotImplementedError 165 166 167class _InlineManager(_ManagerConnection): 168 def __init__(self, broker, port, options, client, worker_class): 169 _ManagerConnection.__init__(self, broker, options, client, worker_class) 170 self._port = port 171 self._inline_worker = None 172 173 def start_worker(self, worker_number): 174 self._inline_worker = _InlineWorkerConnection(self._broker, self._port, 175 self._client, self._worker_class, worker_number) 176 return self._inline_worker 177 178 def run_message_loop(self, delay_secs=None): 179 # Note that delay_secs is ignored in this case since we can't easily 180 # implement it. 181 self._inline_worker.run() 182 self._broker.run_all_pending(MANAGER_TOPIC, self._client) 183 184 185class _ThreadedManager(_ManagerConnection): 186 def __init__(self, broker, port, options, client, worker_class): 187 _ManagerConnection.__init__(self, broker, options, client, worker_class) 188 self._port = port 189 190 def start_worker(self, worker_number): 191 worker_connection = _ThreadedWorkerConnection(self._broker, self._port, 192 self._worker_class, worker_number) 193 worker_connection.start() 194 return worker_connection 195 196 197class _MultiProcessManager(_ManagerConnection): 198 def __init__(self, broker, port, options, client, worker_class): 199 # Note that this class does not keep a handle to the actual port 200 # object, because it isn't Picklable. Instead it keeps the port 201 # name and recreates the port in the child process from the name 202 # and options. 203 _ManagerConnection.__init__(self, broker, options, client, worker_class) 204 self._platform_name = port.real_name() 205 206 def start_worker(self, worker_number): 207 worker_connection = _MultiProcessWorkerConnection(self._broker, self._platform_name, 208 self._worker_class, worker_number, self._options) 209 worker_connection.start() 210 return worker_connection 211 212 213class _WorkerConnection(message_broker2.BrokerConnection): 214 def __init__(self, broker, worker_class, worker_number, options): 215 self._client = worker_class(self, worker_number, options) 216 self.name = self._client.name() 217 message_broker2.BrokerConnection.__init__(self, broker, self._client, 218 ANY_WORKER_TOPIC, MANAGER_TOPIC) 219 220 def cancel(self): 221 raise NotImplementedError 222 223 def is_alive(self): 224 raise NotImplementedError 225 226 def join(self, timeout): 227 raise NotImplementedError 228 229 def log_wedged_worker(self, test_name): 230 raise NotImplementedError 231 232 def yield_to_broker(self): 233 pass 234 235 236class _InlineWorkerConnection(_WorkerConnection): 237 def __init__(self, broker, port, manager_client, worker_class, worker_number): 238 _WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options) 239 self._alive = False 240 self._port = port 241 self._manager_client = manager_client 242 243 def cancel(self): 244 self._client.cancel() 245 246 def is_alive(self): 247 return self._alive 248 249 def join(self, timeout): 250 assert not self._alive 251 252 def log_wedged_worker(self, test_name): 253 assert False, "_InlineWorkerConnection.log_wedged_worker() called" 254 255 def run(self): 256 self._alive = True 257 self._client.run(self._port) 258 self._alive = False 259 260 def yield_to_broker(self): 261 self._broker.run_all_pending(MANAGER_TOPIC, self._manager_client) 262 263 264class _Thread(threading.Thread): 265 def __init__(self, worker_connection, port, client): 266 threading.Thread.__init__(self) 267 self._worker_connection = worker_connection 268 self._port = port 269 self._client = client 270 271 def cancel(self): 272 return self._client.cancel() 273 274 def log_wedged_worker(self, test_name): 275 stack_utils.log_thread_state(_log.error, self._client.name(), self.ident, " is wedged on test %s" % test_name) 276 277 def run(self): 278 # FIXME: We can remove this once everyone is on 2.6. 279 if not hasattr(self, 'ident'): 280 self.ident = thread.get_ident() 281 self._client.run(self._port) 282 283 284class _ThreadedWorkerConnection(_WorkerConnection): 285 def __init__(self, broker, port, worker_class, worker_number): 286 _WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options) 287 self._thread = _Thread(self, port, self._client) 288 289 def cancel(self): 290 return self._thread.cancel() 291 292 def is_alive(self): 293 # FIXME: Change this to is_alive once everyone is on 2.6. 294 return self._thread.isAlive() 295 296 def join(self, timeout): 297 return self._thread.join(timeout) 298 299 def log_wedged_worker(self, test_name): 300 return self._thread.log_wedged_worker(test_name) 301 302 def start(self): 303 self._thread.start() 304 305 306if multiprocessing: 307 308 class _Process(multiprocessing.Process): 309 def __init__(self, worker_connection, platform_name, options, client): 310 multiprocessing.Process.__init__(self) 311 self._worker_connection = worker_connection 312 self._platform_name = platform_name 313 self._options = options 314 self._client = client 315 316 def log_wedged_worker(self, test_name): 317 _log.error("%s (pid %d) is wedged on test %s" % (self.name, self.pid, test_name)) 318 319 def run(self): 320 options = self._options 321 port_obj = port.get(self._platform_name, options) 322 # FIXME: this won't work if the calling process is logging 323 # somewhere other than sys.stderr and sys.stdout, but I'm not sure 324 # if this will be an issue in practice. 325 printer = printing.Printer(port_obj, options, sys.stderr, sys.stdout, 326 int(options.child_processes), options.experimental_fully_parallel) 327 self._client.run(port_obj) 328 printer.cleanup() 329 330 331class _MultiProcessWorkerConnection(_WorkerConnection): 332 def __init__(self, broker, platform_name, worker_class, worker_number, options): 333 _WorkerConnection.__init__(self, broker, worker_class, worker_number, options) 334 self._proc = _Process(self, platform_name, options, self._client) 335 336 def cancel(self): 337 return self._proc.terminate() 338 339 def is_alive(self): 340 return self._proc.is_alive() 341 342 def join(self, timeout): 343 return self._proc.join(timeout) 344 345 def log_wedged_worker(self, test_name): 346 return self._proc.log_wedged_worker(test_name) 347 348 def start(self): 349 self._proc.start() 350