• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2013 The Chromium Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""A "Test Server Spawner" that handles killing/stopping per-test test servers.
6
7It's used to accept requests from the device to spawn and kill instances of the
8chrome test server on the host.
9"""
10
11import BaseHTTPServer
12import json
13import logging
14import os
15import select
16import struct
17import subprocess
18import sys
19import threading
20import time
21import urlparse
22
23import constants
24import ports
25
26from pylib.forwarder import Forwarder
27
28# Path that are needed to import necessary modules when launching a testserver.
29os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', '') + (':%s:%s:%s:%s:%s'
30    % (os.path.join(constants.DIR_SOURCE_ROOT, 'third_party'),
31       os.path.join(constants.DIR_SOURCE_ROOT, 'third_party', 'tlslite'),
32       os.path.join(constants.DIR_SOURCE_ROOT, 'third_party', 'pyftpdlib',
33                    'src'),
34       os.path.join(constants.DIR_SOURCE_ROOT, 'net', 'tools', 'testserver'),
35       os.path.join(constants.DIR_SOURCE_ROOT, 'sync', 'tools', 'testserver')))
36
37
38SERVER_TYPES = {
39    'http': '',
40    'ftp': '-f',
41    'sync': '',  # Sync uses its own script, and doesn't take a server type arg.
42    'tcpecho': '--tcp-echo',
43    'udpecho': '--udp-echo',
44}
45
46
47# The timeout (in seconds) of starting up the Python test server.
48TEST_SERVER_STARTUP_TIMEOUT = 10
49
50def _WaitUntil(predicate, max_attempts=5):
51  """Blocks until the provided predicate (function) is true.
52
53  Returns:
54    Whether the provided predicate was satisfied once (before the timeout).
55  """
56  sleep_time_sec = 0.025
57  for attempt in xrange(1, max_attempts):
58    if predicate():
59      return True
60    time.sleep(sleep_time_sec)
61    sleep_time_sec = min(1, sleep_time_sec * 2)  # Don't wait more than 1 sec.
62  return False
63
64
65def _CheckPortStatus(port, expected_status):
66  """Returns True if port has expected_status.
67
68  Args:
69    port: the port number.
70    expected_status: boolean of expected status.
71
72  Returns:
73    Returns True if the status is expected. Otherwise returns False.
74  """
75  return _WaitUntil(lambda: ports.IsHostPortUsed(port) == expected_status)
76
77
78def _CheckDevicePortStatus(adb, port):
79  """Returns whether the provided port is used."""
80  return _WaitUntil(lambda: ports.IsDevicePortUsed(adb, port))
81
82
83def _GetServerTypeCommandLine(server_type):
84  """Returns the command-line by the given server type.
85
86  Args:
87    server_type: the server type to be used (e.g. 'http').
88
89  Returns:
90    A string containing the command-line argument.
91  """
92  if server_type not in SERVER_TYPES:
93    raise NotImplementedError('Unknown server type: %s' % server_type)
94  if server_type == 'udpecho':
95    raise Exception('Please do not run UDP echo tests because we do not have '
96                    'a UDP forwarder tool.')
97  return SERVER_TYPES[server_type]
98
99
100class TestServerThread(threading.Thread):
101  """A thread to run the test server in a separate process."""
102
103  def __init__(self, ready_event, arguments, adb, tool):
104    """Initialize TestServerThread with the following argument.
105
106    Args:
107      ready_event: event which will be set when the test server is ready.
108      arguments: dictionary of arguments to run the test server.
109      adb: instance of AndroidCommands.
110      tool: instance of runtime error detection tool.
111    """
112    threading.Thread.__init__(self)
113    self.wait_event = threading.Event()
114    self.stop_flag = False
115    self.ready_event = ready_event
116    self.ready_event.clear()
117    self.arguments = arguments
118    self.adb = adb
119    self.tool = tool
120    self.test_server_process = None
121    self.is_ready = False
122    self.host_port = self.arguments['port']
123    assert isinstance(self.host_port, int)
124    # The forwarder device port now is dynamically allocated.
125    self.forwarder_device_port = 0
126    # Anonymous pipe in order to get port info from test server.
127    self.pipe_in = None
128    self.pipe_out = None
129    self.command_line = []
130
131  def _WaitToStartAndGetPortFromTestServer(self):
132    """Waits for the Python test server to start and gets the port it is using.
133
134    The port information is passed by the Python test server with a pipe given
135    by self.pipe_out. It is written as a result to |self.host_port|.
136
137    Returns:
138      Whether the port used by the test server was successfully fetched.
139    """
140    assert self.host_port == 0 and self.pipe_out and self.pipe_in
141    (in_fds, _, _) = select.select([self.pipe_in, ], [], [],
142                                   TEST_SERVER_STARTUP_TIMEOUT)
143    if len(in_fds) == 0:
144      logging.error('Failed to wait to the Python test server to be started.')
145      return False
146    # First read the data length as an unsigned 4-byte value.  This
147    # is _not_ using network byte ordering since the Python test server packs
148    # size as native byte order and all Chromium platforms so far are
149    # configured to use little-endian.
150    # TODO(jnd): Change the Python test server and local_test_server_*.cc to
151    # use a unified byte order (either big-endian or little-endian).
152    data_length = os.read(self.pipe_in, struct.calcsize('=L'))
153    if data_length:
154      (data_length,) = struct.unpack('=L', data_length)
155      assert data_length
156    if not data_length:
157      logging.error('Failed to get length of server data.')
158      return False
159    port_json = os.read(self.pipe_in, data_length)
160    if not port_json:
161      logging.error('Failed to get server data.')
162      return False
163    logging.info('Got port json data: %s', port_json)
164    port_json = json.loads(port_json)
165    if port_json.has_key('port') and isinstance(port_json['port'], int):
166      self.host_port = port_json['port']
167      return _CheckPortStatus(self.host_port, True)
168    logging.error('Failed to get port information from the server data.')
169    return False
170
171  def _GenerateCommandLineArguments(self):
172    """Generates the command line to run the test server.
173
174    Note that all options are processed by following the definitions in
175    testserver.py.
176    """
177    if self.command_line:
178      return
179    # The following arguments must exist.
180    type_cmd = _GetServerTypeCommandLine(self.arguments['server-type'])
181    if type_cmd:
182      self.command_line.append(type_cmd)
183    self.command_line.append('--port=%d' % self.host_port)
184    # Use a pipe to get the port given by the instance of Python test server
185    # if the test does not specify the port.
186    if self.host_port == 0:
187      (self.pipe_in, self.pipe_out) = os.pipe()
188      self.command_line.append('--startup-pipe=%d' % self.pipe_out)
189    self.command_line.append('--host=%s' % self.arguments['host'])
190    data_dir = self.arguments['data-dir'] or 'chrome/test/data'
191    if not os.path.isabs(data_dir):
192      data_dir = os.path.join(constants.DIR_SOURCE_ROOT, data_dir)
193    self.command_line.append('--data-dir=%s' % data_dir)
194    # The following arguments are optional depending on the individual test.
195    if self.arguments.has_key('log-to-console'):
196      self.command_line.append('--log-to-console')
197    if self.arguments.has_key('auth-token'):
198      self.command_line.append('--auth-token=%s' % self.arguments['auth-token'])
199    if self.arguments.has_key('https'):
200      self.command_line.append('--https')
201      if self.arguments.has_key('cert-and-key-file'):
202        self.command_line.append('--cert-and-key-file=%s' % os.path.join(
203            constants.DIR_SOURCE_ROOT, self.arguments['cert-and-key-file']))
204      if self.arguments.has_key('ocsp'):
205        self.command_line.append('--ocsp=%s' % self.arguments['ocsp'])
206      if self.arguments.has_key('https-record-resume'):
207        self.command_line.append('--https-record-resume')
208      if self.arguments.has_key('ssl-client-auth'):
209        self.command_line.append('--ssl-client-auth')
210      if self.arguments.has_key('tls-intolerant'):
211        self.command_line.append('--tls-intolerant=%s' %
212                                 self.arguments['tls-intolerant'])
213      if self.arguments.has_key('ssl-client-ca'):
214        for ca in self.arguments['ssl-client-ca']:
215          self.command_line.append('--ssl-client-ca=%s' %
216                                   os.path.join(constants.DIR_SOURCE_ROOT, ca))
217      if self.arguments.has_key('ssl-bulk-cipher'):
218        for bulk_cipher in self.arguments['ssl-bulk-cipher']:
219          self.command_line.append('--ssl-bulk-cipher=%s' % bulk_cipher)
220
221  def _CloseUnnecessaryFDsForTestServerProcess(self):
222    # This is required to avoid subtle deadlocks that could be caused by the
223    # test server child process inheriting undesirable file descriptors such as
224    # file lock file descriptors.
225    for fd in xrange(0, 1024):
226      if fd != self.pipe_out:
227        try:
228          os.close(fd)
229        except:
230          pass
231
232  def run(self):
233    logging.info('Start running the thread!')
234    self.wait_event.clear()
235    self._GenerateCommandLineArguments()
236    command = constants.DIR_SOURCE_ROOT
237    if self.arguments['server-type'] == 'sync':
238      command = [os.path.join(command, 'sync', 'tools', 'testserver',
239                              'sync_testserver.py')] + self.command_line
240    else:
241      command = [os.path.join(command, 'net', 'tools', 'testserver',
242                              'testserver.py')] + self.command_line
243    logging.info('Running: %s', command)
244    self.process = subprocess.Popen(
245        command, preexec_fn=self._CloseUnnecessaryFDsForTestServerProcess)
246    if self.process:
247      if self.pipe_out:
248        self.is_ready = self._WaitToStartAndGetPortFromTestServer()
249      else:
250        self.is_ready = _CheckPortStatus(self.host_port, True)
251    if self.is_ready:
252      Forwarder.Map([(0, self.host_port)], self.adb, self.tool)
253      # Check whether the forwarder is ready on the device.
254      self.is_ready = False
255      device_port = Forwarder.DevicePortForHostPort(self.host_port)
256      if device_port and _CheckDevicePortStatus(self.adb, device_port):
257        self.is_ready = True
258        self.forwarder_device_port = device_port
259    # Wake up the request handler thread.
260    self.ready_event.set()
261    # Keep thread running until Stop() gets called.
262    _WaitUntil(lambda: self.stop_flag, max_attempts=sys.maxint)
263    if self.process.poll() is None:
264      self.process.kill()
265    Forwarder.UnmapDevicePort(self.forwarder_device_port, self.adb)
266    self.process = None
267    self.is_ready = False
268    if self.pipe_out:
269      os.close(self.pipe_in)
270      os.close(self.pipe_out)
271      self.pipe_in = None
272      self.pipe_out = None
273    logging.info('Test-server has died.')
274    self.wait_event.set()
275
276  def Stop(self):
277    """Blocks until the loop has finished.
278
279    Note that this must be called in another thread.
280    """
281    if not self.process:
282      return
283    self.stop_flag = True
284    self.wait_event.wait()
285
286
287class SpawningServerRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
288  """A handler used to process http GET/POST request."""
289
290  def _SendResponse(self, response_code, response_reason, additional_headers,
291                    contents):
292    """Generates a response sent to the client from the provided parameters.
293
294    Args:
295      response_code: number of the response status.
296      response_reason: string of reason description of the response.
297      additional_headers: dict of additional headers. Each key is the name of
298                          the header, each value is the content of the header.
299      contents: string of the contents we want to send to client.
300    """
301    self.send_response(response_code, response_reason)
302    self.send_header('Content-Type', 'text/html')
303    # Specify the content-length as without it the http(s) response will not
304    # be completed properly (and the browser keeps expecting data).
305    self.send_header('Content-Length', len(contents))
306    for header_name in additional_headers:
307      self.send_header(header_name, additional_headers[header_name])
308    self.end_headers()
309    self.wfile.write(contents)
310    self.wfile.flush()
311
312  def _StartTestServer(self):
313    """Starts the test server thread."""
314    logging.info('Handling request to spawn a test server.')
315    content_type = self.headers.getheader('content-type')
316    if content_type != 'application/json':
317      raise Exception('Bad content-type for start request.')
318    content_length = self.headers.getheader('content-length')
319    if not content_length:
320      content_length = 0
321    try:
322      content_length = int(content_length)
323    except:
324      raise Exception('Bad content-length for start request.')
325    logging.info(content_length)
326    test_server_argument_json = self.rfile.read(content_length)
327    logging.info(test_server_argument_json)
328    assert not self.server.test_server_instance
329    ready_event = threading.Event()
330    self.server.test_server_instance = TestServerThread(
331        ready_event,
332        json.loads(test_server_argument_json),
333        self.server.adb,
334        self.server.tool)
335    self.server.test_server_instance.setDaemon(True)
336    self.server.test_server_instance.start()
337    ready_event.wait()
338    if self.server.test_server_instance.is_ready:
339      self._SendResponse(200, 'OK', {}, json.dumps(
340          {'port': self.server.test_server_instance.forwarder_device_port,
341           'message': 'started'}))
342      logging.info('Test server is running on port: %d.',
343                   self.server.test_server_instance.host_port)
344    else:
345      self.server.test_server_instance.Stop()
346      self.server.test_server_instance = None
347      self._SendResponse(500, 'Test Server Error.', {}, '')
348      logging.info('Encounter problem during starting a test server.')
349
350  def _KillTestServer(self):
351    """Stops the test server instance."""
352    # There should only ever be one test server at a time. This may do the
353    # wrong thing if we try and start multiple test servers.
354    if not self.server.test_server_instance:
355      return
356    port = self.server.test_server_instance.host_port
357    logging.info('Handling request to kill a test server on port: %d.', port)
358    self.server.test_server_instance.Stop()
359    # Make sure the status of test server is correct before sending response.
360    if _CheckPortStatus(port, False):
361      self._SendResponse(200, 'OK', {}, 'killed')
362      logging.info('Test server on port %d is killed', port)
363    else:
364      self._SendResponse(500, 'Test Server Error.', {}, '')
365      logging.info('Encounter problem during killing a test server.')
366    self.server.test_server_instance = None
367
368  def do_POST(self):
369    parsed_path = urlparse.urlparse(self.path)
370    action = parsed_path.path
371    logging.info('Action for POST method is: %s.', action)
372    if action == '/start':
373      self._StartTestServer()
374    else:
375      self._SendResponse(400, 'Unknown request.', {}, '')
376      logging.info('Encounter unknown request: %s.', action)
377
378  def do_GET(self):
379    parsed_path = urlparse.urlparse(self.path)
380    action = parsed_path.path
381    params = urlparse.parse_qs(parsed_path.query, keep_blank_values=1)
382    logging.info('Action for GET method is: %s.', action)
383    for param in params:
384      logging.info('%s=%s', param, params[param][0])
385    if action == '/kill':
386      self._KillTestServer()
387    elif action == '/ping':
388      # The ping handler is used to check whether the spawner server is ready
389      # to serve the requests. We don't need to test the status of the test
390      # server when handling ping request.
391      self._SendResponse(200, 'OK', {}, 'ready')
392      logging.info('Handled ping request and sent response.')
393    else:
394      self._SendResponse(400, 'Unknown request', {}, '')
395      logging.info('Encounter unknown request: %s.', action)
396
397
398class SpawningServer(object):
399  """The class used to start/stop a http server."""
400
401  def __init__(self, test_server_spawner_port, adb, tool):
402    logging.info('Creating new spawner on port: %d.', test_server_spawner_port)
403    self.server = BaseHTTPServer.HTTPServer(('', test_server_spawner_port),
404                                            SpawningServerRequestHandler)
405    self.server.adb = adb
406    self.server.tool = tool
407    self.server.test_server_instance = None
408    self.server.build_type = constants.GetBuildType()
409
410  def _Listen(self):
411    logging.info('Starting test server spawner')
412    self.server.serve_forever()
413
414  def Start(self):
415    """Starts the test server spawner."""
416    listener_thread = threading.Thread(target=self._Listen)
417    listener_thread.setDaemon(True)
418    listener_thread.start()
419
420  def Stop(self):
421    """Stops the test server spawner.
422
423    Also cleans the server state.
424    """
425    self.CleanupState()
426    self.server.shutdown()
427
428  def CleanupState(self):
429    """Cleans up the spawning server state.
430
431    This should be called if the test server spawner is reused,
432    to avoid sharing the test server instance.
433    """
434    if self.server.test_server_instance:
435      self.server.test_server_instance.Stop()
436      self.server.test_server_instance = None
437