• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""A "Test Server Spawner" that handles killing/stopping per-test test servers.
6
7It's used to accept requests from the device to spawn and kill instances of the
8chrome test server on the host.
9"""
10# pylint: disable=W0702
11
12import BaseHTTPServer
13import json
14import logging
15import os
16import select
17import struct
18import subprocess
19import sys
20import threading
21import time
22import urlparse
23
24from devil.android import forwarder
25from devil.android import ports
26
27from pylib import constants
28from pylib.constants import host_paths
29
30
31# Path that are needed to import necessary modules when launching a testserver.
32os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', '') + (':%s:%s:%s:%s:%s'
33    % (os.path.join(host_paths.DIR_SOURCE_ROOT, 'third_party'),
34       os.path.join(host_paths.DIR_SOURCE_ROOT, 'third_party', 'tlslite'),
35       os.path.join(host_paths.DIR_SOURCE_ROOT, 'third_party', 'pyftpdlib',
36                    'src'),
37       os.path.join(host_paths.DIR_SOURCE_ROOT, 'net', 'tools', 'testserver'),
38       os.path.join(host_paths.DIR_SOURCE_ROOT, 'sync', 'tools', 'testserver')))
39
40
41SERVER_TYPES = {
42    'http': '',
43    'ftp': '-f',
44    'sync': '',  # Sync uses its own script, and doesn't take a server type arg.
45    'tcpecho': '--tcp-echo',
46    'udpecho': '--udp-echo',
47}
48
49
50# The timeout (in seconds) of starting up the Python test server.
51TEST_SERVER_STARTUP_TIMEOUT = 10
52
53def _WaitUntil(predicate, max_attempts=5):
54  """Blocks until the provided predicate (function) is true.
55
56  Returns:
57    Whether the provided predicate was satisfied once (before the timeout).
58  """
59  sleep_time_sec = 0.025
60  for _ in xrange(1, max_attempts):
61    if predicate():
62      return True
63    time.sleep(sleep_time_sec)
64    sleep_time_sec = min(1, sleep_time_sec * 2)  # Don't wait more than 1 sec.
65  return False
66
67
68def _CheckPortAvailable(port):
69  """Returns True if |port| is available."""
70  return _WaitUntil(lambda: ports.IsHostPortAvailable(port))
71
72
73def _CheckPortNotAvailable(port):
74  """Returns True if |port| is not available."""
75  return _WaitUntil(lambda: not ports.IsHostPortAvailable(port))
76
77
78def _CheckDevicePortStatus(device, port):
79  """Returns whether the provided port is used."""
80  return _WaitUntil(lambda: ports.IsDevicePortUsed(device, port))
81
82
83def _GetServerTypeCommandLine(server_type):
84  """Returns the command-line by the given server type.
85
86  Args:
87    server_type: the server type to be used (e.g. 'http').
88
89  Returns:
90    A string containing the command-line argument.
91  """
92  if server_type not in SERVER_TYPES:
93    raise NotImplementedError('Unknown server type: %s' % server_type)
94  if server_type == 'udpecho':
95    raise Exception('Please do not run UDP echo tests because we do not have '
96                    'a UDP forwarder tool.')
97  return SERVER_TYPES[server_type]
98
99
100class TestServerThread(threading.Thread):
101  """A thread to run the test server in a separate process."""
102
103  def __init__(self, ready_event, arguments, device, tool):
104    """Initialize TestServerThread with the following argument.
105
106    Args:
107      ready_event: event which will be set when the test server is ready.
108      arguments: dictionary of arguments to run the test server.
109      device: An instance of DeviceUtils.
110      tool: instance of runtime error detection tool.
111    """
112    threading.Thread.__init__(self)
113    self.wait_event = threading.Event()
114    self.stop_flag = False
115    self.ready_event = ready_event
116    self.ready_event.clear()
117    self.arguments = arguments
118    self.device = device
119    self.tool = tool
120    self.test_server_process = None
121    self.is_ready = False
122    self.host_port = self.arguments['port']
123    assert isinstance(self.host_port, int)
124    # The forwarder device port now is dynamically allocated.
125    self.forwarder_device_port = 0
126    # Anonymous pipe in order to get port info from test server.
127    self.pipe_in = None
128    self.pipe_out = None
129    self.process = None
130    self.command_line = []
131
132  def _WaitToStartAndGetPortFromTestServer(self):
133    """Waits for the Python test server to start and gets the port it is using.
134
135    The port information is passed by the Python test server with a pipe given
136    by self.pipe_out. It is written as a result to |self.host_port|.
137
138    Returns:
139      Whether the port used by the test server was successfully fetched.
140    """
141    assert self.host_port == 0 and self.pipe_out and self.pipe_in
142    (in_fds, _, _) = select.select([self.pipe_in, ], [], [],
143                                   TEST_SERVER_STARTUP_TIMEOUT)
144    if len(in_fds) == 0:
145      logging.error('Failed to wait to the Python test server to be started.')
146      return False
147    # First read the data length as an unsigned 4-byte value.  This
148    # is _not_ using network byte ordering since the Python test server packs
149    # size as native byte order and all Chromium platforms so far are
150    # configured to use little-endian.
151    # TODO(jnd): Change the Python test server and local_test_server_*.cc to
152    # use a unified byte order (either big-endian or little-endian).
153    data_length = os.read(self.pipe_in, struct.calcsize('=L'))
154    if data_length:
155      (data_length,) = struct.unpack('=L', data_length)
156      assert data_length
157    if not data_length:
158      logging.error('Failed to get length of server data.')
159      return False
160    port_json = os.read(self.pipe_in, data_length)
161    if not port_json:
162      logging.error('Failed to get server data.')
163      return False
164    logging.info('Got port json data: %s', port_json)
165    port_json = json.loads(port_json)
166    if port_json.has_key('port') and isinstance(port_json['port'], int):
167      self.host_port = port_json['port']
168      return _CheckPortNotAvailable(self.host_port)
169    logging.error('Failed to get port information from the server data.')
170    return False
171
172  def _GenerateCommandLineArguments(self):
173    """Generates the command line to run the test server.
174
175    Note that all options are processed by following the definitions in
176    testserver.py.
177    """
178    if self.command_line:
179      return
180
181    args_copy = dict(self.arguments)
182
183    # Translate the server type.
184    type_cmd = _GetServerTypeCommandLine(args_copy.pop('server-type'))
185    if type_cmd:
186      self.command_line.append(type_cmd)
187
188    # Use a pipe to get the port given by the instance of Python test server
189    # if the test does not specify the port.
190    assert self.host_port == args_copy['port']
191    if self.host_port == 0:
192      (self.pipe_in, self.pipe_out) = os.pipe()
193      self.command_line.append('--startup-pipe=%d' % self.pipe_out)
194
195    # Pass the remaining arguments as-is.
196    for key, values in args_copy.iteritems():
197      if not isinstance(values, list):
198        values = [values]
199      for value in values:
200        if value is None:
201          self.command_line.append('--%s' % key)
202        else:
203          self.command_line.append('--%s=%s' % (key, value))
204
205  def _CloseUnnecessaryFDsForTestServerProcess(self):
206    # This is required to avoid subtle deadlocks that could be caused by the
207    # test server child process inheriting undesirable file descriptors such as
208    # file lock file descriptors.
209    for fd in xrange(0, 1024):
210      if fd != self.pipe_out:
211        try:
212          os.close(fd)
213        except:
214          pass
215
216  def run(self):
217    logging.info('Start running the thread!')
218    self.wait_event.clear()
219    self._GenerateCommandLineArguments()
220    command = host_paths.DIR_SOURCE_ROOT
221    if self.arguments['server-type'] == 'sync':
222      command = [os.path.join(command, 'sync', 'tools', 'testserver',
223                              'sync_testserver.py')] + self.command_line
224    else:
225      command = [os.path.join(command, 'net', 'tools', 'testserver',
226                              'testserver.py')] + self.command_line
227    logging.info('Running: %s', command)
228
229    # Disable PYTHONUNBUFFERED because it has a bad interaction with the
230    # testserver. Remove once this interaction is fixed.
231    unbuf = os.environ.pop('PYTHONUNBUFFERED', None)
232
233    # Pass DIR_SOURCE_ROOT as the child's working directory so that relative
234    # paths in the arguments are resolved correctly.
235    self.process = subprocess.Popen(
236        command, preexec_fn=self._CloseUnnecessaryFDsForTestServerProcess,
237        cwd=host_paths.DIR_SOURCE_ROOT)
238    if unbuf:
239      os.environ['PYTHONUNBUFFERED'] = unbuf
240    if self.process:
241      if self.pipe_out:
242        self.is_ready = self._WaitToStartAndGetPortFromTestServer()
243      else:
244        self.is_ready = _CheckPortNotAvailable(self.host_port)
245    if self.is_ready:
246      forwarder.Forwarder.Map([(0, self.host_port)], self.device, self.tool)
247      # Check whether the forwarder is ready on the device.
248      self.is_ready = False
249      device_port = forwarder.Forwarder.DevicePortForHostPort(self.host_port)
250      if device_port and _CheckDevicePortStatus(self.device, device_port):
251        self.is_ready = True
252        self.forwarder_device_port = device_port
253    # Wake up the request handler thread.
254    self.ready_event.set()
255    # Keep thread running until Stop() gets called.
256    _WaitUntil(lambda: self.stop_flag, max_attempts=sys.maxint)
257    if self.process.poll() is None:
258      self.process.kill()
259    forwarder.Forwarder.UnmapDevicePort(self.forwarder_device_port, self.device)
260    self.process = None
261    self.is_ready = False
262    if self.pipe_out:
263      os.close(self.pipe_in)
264      os.close(self.pipe_out)
265      self.pipe_in = None
266      self.pipe_out = None
267    logging.info('Test-server has died.')
268    self.wait_event.set()
269
270  def Stop(self):
271    """Blocks until the loop has finished.
272
273    Note that this must be called in another thread.
274    """
275    if not self.process:
276      return
277    self.stop_flag = True
278    self.wait_event.wait()
279
280
281class SpawningServerRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
282  """A handler used to process http GET/POST request."""
283
284  def _SendResponse(self, response_code, response_reason, additional_headers,
285                    contents):
286    """Generates a response sent to the client from the provided parameters.
287
288    Args:
289      response_code: number of the response status.
290      response_reason: string of reason description of the response.
291      additional_headers: dict of additional headers. Each key is the name of
292                          the header, each value is the content of the header.
293      contents: string of the contents we want to send to client.
294    """
295    self.send_response(response_code, response_reason)
296    self.send_header('Content-Type', 'text/html')
297    # Specify the content-length as without it the http(s) response will not
298    # be completed properly (and the browser keeps expecting data).
299    self.send_header('Content-Length', len(contents))
300    for header_name in additional_headers:
301      self.send_header(header_name, additional_headers[header_name])
302    self.end_headers()
303    self.wfile.write(contents)
304    self.wfile.flush()
305
306  def _StartTestServer(self):
307    """Starts the test server thread."""
308    logging.info('Handling request to spawn a test server.')
309    content_type = self.headers.getheader('content-type')
310    if content_type != 'application/json':
311      raise Exception('Bad content-type for start request.')
312    content_length = self.headers.getheader('content-length')
313    if not content_length:
314      content_length = 0
315    try:
316      content_length = int(content_length)
317    except:
318      raise Exception('Bad content-length for start request.')
319    logging.info(content_length)
320    test_server_argument_json = self.rfile.read(content_length)
321    logging.info(test_server_argument_json)
322    assert not self.server.test_server_instance
323    ready_event = threading.Event()
324    self.server.test_server_instance = TestServerThread(
325        ready_event,
326        json.loads(test_server_argument_json),
327        self.server.device,
328        self.server.tool)
329    self.server.test_server_instance.setDaemon(True)
330    self.server.test_server_instance.start()
331    ready_event.wait()
332    if self.server.test_server_instance.is_ready:
333      self._SendResponse(200, 'OK', {}, json.dumps(
334          {'port': self.server.test_server_instance.forwarder_device_port,
335           'message': 'started'}))
336      logging.info('Test server is running on port: %d.',
337                   self.server.test_server_instance.host_port)
338    else:
339      self.server.test_server_instance.Stop()
340      self.server.test_server_instance = None
341      self._SendResponse(500, 'Test Server Error.', {}, '')
342      logging.info('Encounter problem during starting a test server.')
343
344  def _KillTestServer(self):
345    """Stops the test server instance."""
346    # There should only ever be one test server at a time. This may do the
347    # wrong thing if we try and start multiple test servers.
348    if not self.server.test_server_instance:
349      return
350    port = self.server.test_server_instance.host_port
351    logging.info('Handling request to kill a test server on port: %d.', port)
352    self.server.test_server_instance.Stop()
353    # Make sure the status of test server is correct before sending response.
354    if _CheckPortAvailable(port):
355      self._SendResponse(200, 'OK', {}, 'killed')
356      logging.info('Test server on port %d is killed', port)
357    else:
358      self._SendResponse(500, 'Test Server Error.', {}, '')
359      logging.info('Encounter problem during killing a test server.')
360    self.server.test_server_instance = None
361
362  def do_POST(self):
363    parsed_path = urlparse.urlparse(self.path)
364    action = parsed_path.path
365    logging.info('Action for POST method is: %s.', action)
366    if action == '/start':
367      self._StartTestServer()
368    else:
369      self._SendResponse(400, 'Unknown request.', {}, '')
370      logging.info('Encounter unknown request: %s.', action)
371
372  def do_GET(self):
373    parsed_path = urlparse.urlparse(self.path)
374    action = parsed_path.path
375    params = urlparse.parse_qs(parsed_path.query, keep_blank_values=1)
376    logging.info('Action for GET method is: %s.', action)
377    for param in params:
378      logging.info('%s=%s', param, params[param][0])
379    if action == '/kill':
380      self._KillTestServer()
381    elif action == '/ping':
382      # The ping handler is used to check whether the spawner server is ready
383      # to serve the requests. We don't need to test the status of the test
384      # server when handling ping request.
385      self._SendResponse(200, 'OK', {}, 'ready')
386      logging.info('Handled ping request and sent response.')
387    else:
388      self._SendResponse(400, 'Unknown request', {}, '')
389      logging.info('Encounter unknown request: %s.', action)
390
391
392class SpawningServer(object):
393  """The class used to start/stop a http server."""
394
395  def __init__(self, test_server_spawner_port, device, tool):
396    logging.info('Creating new spawner on port: %d.', test_server_spawner_port)
397    self.server = BaseHTTPServer.HTTPServer(('', test_server_spawner_port),
398                                            SpawningServerRequestHandler)
399    self.server.device = device
400    self.server.tool = tool
401    self.server.test_server_instance = None
402    self.server.build_type = constants.GetBuildType()
403
404  def _Listen(self):
405    logging.info('Starting test server spawner')
406    self.server.serve_forever()
407
408  def Start(self):
409    """Starts the test server spawner."""
410    listener_thread = threading.Thread(target=self._Listen)
411    listener_thread.setDaemon(True)
412    listener_thread.start()
413
414  def Stop(self):
415    """Stops the test server spawner.
416
417    Also cleans the server state.
418    """
419    self.CleanupState()
420    self.server.shutdown()
421
422  def CleanupState(self):
423    """Cleans up the spawning server state.
424
425    This should be called if the test server spawner is reused,
426    to avoid sharing the test server instance.
427    """
428    if self.server.test_server_instance:
429      self.server.test_server_instance.Stop()
430      self.server.test_server_instance = None
431