• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (C) 2011 Google Inc. All rights reserved.
2#
3# Redistribution and use in source and binary forms, with or without
4# modification, are permitted provided that the following conditions are
5# met:
6#
7#     * Redistributions of source code must retain the above copyright
8# notice, this list of conditions and the following disclaimer.
9#     * Redistributions in binary form must reproduce the above
10# copyright notice, this list of conditions and the following disclaimer
11# in the documentation and/or other materials provided with the
12# distribution.
13#     * Neither the name of Google Inc. nor the names of its
14# contributors may be used to endorse or promote products derived from
15# this software without specific prior written permission.
16#
17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29"""Module for handling messages and concurrency for run-webkit-tests.
30
31This module implements a message broker that connects the manager
32(TestRunner2) to the workers: it provides a messaging abstraction and
33message loops (building on top of message_broker2), and handles starting
34workers by launching threads and/or processes depending on the
35requested configuration.
36
37There are a lot of classes and objects involved in a fully connected system.
38They interact more or less like:
39
40TestRunner2  --> _InlineManager ---> _InlineWorker <-> Worker
41     ^                    \               /              ^
42     |                     v             v               |
43     \--------------------  MessageBroker   -------------/
44"""
45
46import logging
47import optparse
48import printing
49import Queue
50import sys
51import thread
52import threading
53import time
54
55
56# Handle Python < 2.6 where multiprocessing isn't available.
57try:
58    import multiprocessing
59except ImportError:
60    multiprocessing = None
61
62
63from webkitpy.common.system import stack_utils
64from webkitpy.layout_tests import port
65from webkitpy.layout_tests.layout_package import message_broker2
66
67
68_log = logging.getLogger(__name__)
69
70#
71# Topic names for Manager <-> Worker messaging
72#
73MANAGER_TOPIC = 'managers'
74ANY_WORKER_TOPIC = 'workers'
75
76
77def runtime_options():
78    """Return a list of optparse.Option objects for any runtime values used
79    by this module."""
80    options = [
81        optparse.make_option("--worker-model", action="store",
82            help=("controls worker model. Valid values are "
83            "'inline', 'threads', and 'processes'.")),
84    ]
85    return options
86
87
88def get(port, options, client, worker_class):
89    """Return a connection to a manager/worker message_broker
90
91    Args:
92        port - handle to layout_tests/port object for port-specific stuff
93        options - optparse argument for command-line options
94        client - message_broker2.BrokerClient implementation to dispatch
95            replies to.
96        worker_class - type of workers to create. This class must implement
97            the methods in AbstractWorker.
98    Returns:
99        A handle to an object that will talk to a message broker configured
100        for the normal manager/worker communication.
101    """
102    worker_model = options.worker_model
103    if worker_model == 'inline':
104        queue_class = Queue.Queue
105        manager_class = _InlineManager
106    elif worker_model == 'threads':
107        queue_class = Queue.Queue
108        manager_class = _ThreadedManager
109    elif worker_model == 'processes' and multiprocessing:
110        queue_class = multiprocessing.Queue
111        manager_class = _MultiProcessManager
112    else:
113        raise ValueError("unsupported value for --worker-model: %s" %
114                         worker_model)
115
116    broker = message_broker2.Broker(options, queue_class)
117    return manager_class(broker, port, options, client, worker_class)
118
119
120class AbstractWorker(message_broker2.BrokerClient):
121    def __init__(self, broker_connection, worker_number, options):
122        """The constructor should be used to do any simple initialization
123        necessary, but should not do anything that creates data structures
124        that cannot be Pickled or sent across processes (like opening
125        files or sockets). Complex initialization should be done at the
126        start of the run() call.
127
128        Args:
129            broker_connection - handle to the BrokerConnection object creating
130                the worker and that can be used for messaging.
131            worker_number - identifier for this particular worker
132            options - command-line argument object from optparse"""
133
134        raise NotImplementedError
135
136    def run(self, port):
137        """Callback for the worker to start executing. Typically does any
138        remaining initialization and then calls broker_connection.run_message_loop()."""
139        raise NotImplementedError
140
141    def cancel(self):
142        """Called when possible to indicate to the worker to stop processing
143        messages and shut down. Note that workers may be stopped without this
144        method being called, so clients should not rely solely on this."""
145        raise NotImplementedError
146
147
148class _ManagerConnection(message_broker2.BrokerConnection):
149    def __init__(self, broker, options, client, worker_class):
150        """Base initialization for all Manager objects.
151
152        Args:
153            broker: handle to the message_broker2 object
154            options: command line options object
155            client: callback object (the caller)
156            worker_class: class object to use to create workers.
157        """
158        message_broker2.BrokerConnection.__init__(self, broker, client,
159            MANAGER_TOPIC, ANY_WORKER_TOPIC)
160        self._options = options
161        self._worker_class = worker_class
162
163    def start_worker(self, worker_number):
164        raise NotImplementedError
165
166
167class _InlineManager(_ManagerConnection):
168    def __init__(self, broker, port, options, client, worker_class):
169        _ManagerConnection.__init__(self, broker, options, client, worker_class)
170        self._port = port
171        self._inline_worker = None
172
173    def start_worker(self, worker_number):
174        self._inline_worker = _InlineWorkerConnection(self._broker, self._port,
175            self._client, self._worker_class, worker_number)
176        return self._inline_worker
177
178    def run_message_loop(self, delay_secs=None):
179        # Note that delay_secs is ignored in this case since we can't easily
180        # implement it.
181        self._inline_worker.run()
182        self._broker.run_all_pending(MANAGER_TOPIC, self._client)
183
184
185class _ThreadedManager(_ManagerConnection):
186    def __init__(self, broker, port, options, client, worker_class):
187        _ManagerConnection.__init__(self, broker, options, client, worker_class)
188        self._port = port
189
190    def start_worker(self, worker_number):
191        worker_connection = _ThreadedWorkerConnection(self._broker, self._port,
192            self._worker_class, worker_number)
193        worker_connection.start()
194        return worker_connection
195
196
197class _MultiProcessManager(_ManagerConnection):
198    def __init__(self, broker, port, options, client, worker_class):
199        # Note that this class does not keep a handle to the actual port
200        # object, because it isn't Picklable. Instead it keeps the port
201        # name and recreates the port in the child process from the name
202        # and options.
203        _ManagerConnection.__init__(self, broker, options, client, worker_class)
204        self._platform_name = port.real_name()
205
206    def start_worker(self, worker_number):
207        worker_connection = _MultiProcessWorkerConnection(self._broker, self._platform_name,
208            self._worker_class, worker_number, self._options)
209        worker_connection.start()
210        return worker_connection
211
212
213class _WorkerConnection(message_broker2.BrokerConnection):
214    def __init__(self, broker, worker_class, worker_number, options):
215        self._client = worker_class(self, worker_number, options)
216        self.name = self._client.name()
217        message_broker2.BrokerConnection.__init__(self, broker, self._client,
218                                                  ANY_WORKER_TOPIC, MANAGER_TOPIC)
219
220    def cancel(self):
221        raise NotImplementedError
222
223    def is_alive(self):
224        raise NotImplementedError
225
226    def join(self, timeout):
227        raise NotImplementedError
228
229    def log_wedged_worker(self, test_name):
230        raise NotImplementedError
231
232    def yield_to_broker(self):
233        pass
234
235
236class _InlineWorkerConnection(_WorkerConnection):
237    def __init__(self, broker, port, manager_client, worker_class, worker_number):
238        _WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options)
239        self._alive = False
240        self._port = port
241        self._manager_client = manager_client
242
243    def cancel(self):
244        self._client.cancel()
245
246    def is_alive(self):
247        return self._alive
248
249    def join(self, timeout):
250        assert not self._alive
251
252    def log_wedged_worker(self, test_name):
253        assert False, "_InlineWorkerConnection.log_wedged_worker() called"
254
255    def run(self):
256        self._alive = True
257        self._client.run(self._port)
258        self._alive = False
259
260    def yield_to_broker(self):
261        self._broker.run_all_pending(MANAGER_TOPIC, self._manager_client)
262
263
264class _Thread(threading.Thread):
265    def __init__(self, worker_connection, port, client):
266        threading.Thread.__init__(self)
267        self._worker_connection = worker_connection
268        self._port = port
269        self._client = client
270
271    def cancel(self):
272        return self._client.cancel()
273
274    def log_wedged_worker(self, test_name):
275        stack_utils.log_thread_state(_log.error, self._client.name(), self.ident, " is wedged on test %s" % test_name)
276
277    def run(self):
278        # FIXME: We can remove this once everyone is on 2.6.
279        if not hasattr(self, 'ident'):
280            self.ident = thread.get_ident()
281        self._client.run(self._port)
282
283
284class _ThreadedWorkerConnection(_WorkerConnection):
285    def __init__(self, broker, port, worker_class, worker_number):
286        _WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options)
287        self._thread = _Thread(self, port, self._client)
288
289    def cancel(self):
290        return self._thread.cancel()
291
292    def is_alive(self):
293        # FIXME: Change this to is_alive once everyone is on 2.6.
294        return self._thread.isAlive()
295
296    def join(self, timeout):
297        return self._thread.join(timeout)
298
299    def log_wedged_worker(self, test_name):
300        return self._thread.log_wedged_worker(test_name)
301
302    def start(self):
303        self._thread.start()
304
305
306if multiprocessing:
307
308    class _Process(multiprocessing.Process):
309        def __init__(self, worker_connection, platform_name, options, client):
310            multiprocessing.Process.__init__(self)
311            self._worker_connection = worker_connection
312            self._platform_name = platform_name
313            self._options = options
314            self._client = client
315
316        def log_wedged_worker(self, test_name):
317            _log.error("%s (pid %d) is wedged on test %s" % (self.name, self.pid, test_name))
318
319        def run(self):
320            options = self._options
321            port_obj = port.get(self._platform_name, options)
322            # FIXME: this won't work if the calling process is logging
323            # somewhere other than sys.stderr and sys.stdout, but I'm not sure
324            # if this will be an issue in practice.
325            printer = printing.Printer(port_obj, options, sys.stderr, sys.stdout,
326                int(options.child_processes), options.experimental_fully_parallel)
327            self._client.run(port_obj)
328            printer.cleanup()
329
330
331class _MultiProcessWorkerConnection(_WorkerConnection):
332    def __init__(self, broker, platform_name, worker_class, worker_number, options):
333        _WorkerConnection.__init__(self, broker, worker_class, worker_number, options)
334        self._proc = _Process(self, platform_name, options, self._client)
335
336    def cancel(self):
337        return self._proc.terminate()
338
339    def is_alive(self):
340        return self._proc.is_alive()
341
342    def join(self, timeout):
343        return self._proc.join(timeout)
344
345    def log_wedged_worker(self, test_name):
346        return self._proc.log_wedged_worker(test_name)
347
348    def start(self):
349        self._proc.start()
350