• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2017 The Chromium Authors
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""A "Test Server Spawner" that handles killing/stopping per-test test servers.
6
7It's used to accept requests from the device to spawn and kill instances of the
8chrome test server on the host.
9"""
10# pylint: disable=W0702
11
12import json
13import logging
14import os
15import select
16import struct
17import subprocess
18import sys
19import threading
20import time
21import urllib
22
23from http.server import BaseHTTPRequestHandler
24from http.server import HTTPServer
25
26SERVER_TYPES = {
27    'http': '',
28    'ftp': '-f',
29    'ws': '--websocket',
30}
31
32
33_DIR_SOURCE_ROOT = os.path.abspath(
34    os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, os.pardir,
35                 os.pardir))
36
37
38_logger = logging.getLogger(__name__)
39
40
41# Path that are needed to import necessary modules when launching a testserver.
42os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', '') + (
43    ':%s:%s' % (os.path.join(_DIR_SOURCE_ROOT, 'third_party'),
44                os.path.join(_DIR_SOURCE_ROOT, 'net', 'tools', 'testserver')))
45
46
47def _GetServerTypeCommandLine(server_type):
48  """Returns the command-line by the given server type.
49
50  Args:
51    server_type: the server type to be used (e.g. 'http').
52
53  Returns:
54    A string containing the command-line argument.
55  """
56  if server_type not in SERVER_TYPES:
57    raise NotImplementedError('Unknown server type: %s' % server_type)
58  return SERVER_TYPES[server_type]
59
60
61class PortForwarder:
62  def Map(self, port_pairs):
63    pass
64
65  def GetDevicePortForHostPort(self, host_port):
66    """Returns the device port that corresponds to a given host port."""
67    return host_port
68
69  def WaitHostPortAvailable(self, port):
70    """Returns True if |port| is available."""
71    return True
72
73  def WaitPortNotAvailable(self, port):
74    """Returns True if |port| is not available."""
75    return True
76
77  def WaitDevicePortReady(self, port):
78    """Returns whether the provided port is used."""
79    return True
80
81  def Unmap(self, device_port):
82    """Unmaps specified port"""
83    pass
84
85
86class TestServerThread(threading.Thread):
87  """A thread to run the test server in a separate process."""
88
89  def __init__(self, ready_event, arguments, port_forwarder):
90    """Initialize TestServerThread with the following argument.
91
92    Args:
93      ready_event: event which will be set when the test server is ready.
94      arguments: dictionary of arguments to run the test server.
95      device: An instance of DeviceUtils.
96      tool: instance of runtime error detection tool.
97    """
98    threading.Thread.__init__(self)
99    self.wait_event = threading.Event()
100    self.stop_event = threading.Event()
101    self.ready_event = ready_event
102    self.ready_event.clear()
103    self.arguments = arguments
104    self.port_forwarder = port_forwarder
105    self.test_server_process = None
106    self.is_ready = False
107    self.host_port = 0
108    self.host_ocsp_port = 0
109    assert isinstance(self.host_port, int)
110    # The forwarder device port now is dynamically allocated.
111    self.forwarder_device_port = 0
112    self.forwarder_ocsp_device_port = 0
113    self.process = None
114    self.command_line = []
115
116  def _WaitToStartAndGetPortFromTestServer(self, pipe_in):
117    """Waits for the Python test server to start and gets the port it is using.
118
119    The port information is passed by the Python test server with a pipe given
120    by |pipe_in|. It is written as a result to |self.host_port|.
121
122    Returns:
123      Whether the port used by the test server was successfully fetched.
124    """
125    (in_fds, _, _) = select.select([pipe_in], [], [])
126    if len(in_fds) == 0:
127      _logger.error('Failed to wait to the Python test server to be started.')
128      return False
129    # First read the data length as an unsigned 4-byte value.  This
130    # is _not_ using network byte ordering since the Python test server packs
131    # size as native byte order and all Chromium platforms so far are
132    # configured to use little-endian.
133    # TODO(jnd): Change the Python test server and local_test_server_*.cc to
134    # use a unified byte order (either big-endian or little-endian).
135    data_length = os.read(pipe_in, struct.calcsize('=L'))
136    if data_length:
137      (data_length,) = struct.unpack('=L', data_length)
138      assert data_length
139    if not data_length:
140      _logger.error('Failed to get length of server data.')
141      return False
142    server_data_json = os.read(pipe_in, data_length)
143    if not server_data_json:
144      _logger.error('Failed to get server data.')
145      return False
146    _logger.info('Got port json data: %s', server_data_json)
147
148    parsed_server_data = None
149    try:
150      parsed_server_data = json.loads(server_data_json)
151    except ValueError:
152      pass
153
154    if not isinstance(parsed_server_data, dict):
155      _logger.error('Failed to parse server_data: %s' % server_data_json)
156      return False
157
158    if not isinstance(parsed_server_data.get('port'), int):
159      _logger.error('Failed to get port information from the server data.')
160      return False
161
162    self.host_port = parsed_server_data['port']
163    self.host_ocsp_port = parsed_server_data.get('ocsp_port', 0)
164
165    return self.port_forwarder.WaitPortNotAvailable(self.host_port)
166
167  def _GenerateCommandLineArguments(self, pipe_out):
168    """Generates the command line to run the test server.
169
170    Note that all options are processed by following the definitions in
171    testserver.py.
172    """
173    if self.command_line:
174      return
175
176    args_copy = dict(self.arguments)
177
178    # Translate the server type.
179    type_cmd = _GetServerTypeCommandLine(args_copy.pop('server-type'))
180    if type_cmd:
181      self.command_line.append(type_cmd)
182
183    # Use a pipe to get the port given by the Python test server.
184    self.command_line.append('--startup-pipe=%d' % pipe_out)
185
186    # Pass the remaining arguments as-is.
187    for key, values in args_copy.items():
188      if not isinstance(values, list):
189        values = [values]
190      for value in values:
191        if value is None:
192          self.command_line.append('--%s' % key)
193        else:
194          self.command_line.append('--%s=%s' % (key, value))
195
196  def _CloseUnnecessaryFDsForTestServerProcess(self, pipe_out):
197    # This is required to avoid subtle deadlocks that could be caused by the
198    # test server child process inheriting undesirable file descriptors such as
199    # file lock file descriptors. Note stdin, stdout, and stderr (0-2) are left
200    # alone and redirected with subprocess.Popen. It is important to leave those
201    # fds filled, or the test server will accidentally open other fds at those
202    # numbers.
203    for fd in range(3, 1024):
204      if fd != pipe_out:
205        try:
206          os.close(fd)
207        except:
208          pass
209
210  def run(self):
211    _logger.info('Start running the thread!')
212    self.wait_event.clear()
213
214    # Set up a pipe for the server to report when it has started.
215    pipe_in, pipe_out = os.pipe()
216
217    # TODO(crbug.com/40618161): Remove if condition after python3 migration.
218    if hasattr(os, 'set_inheritable'):
219      os.set_inheritable(pipe_out, True)
220
221    try:
222      self._GenerateCommandLineArguments(pipe_out)
223      # TODO(crbug.com/40618161): When this script is ported to Python 3, replace
224      # 'vpython3' below with sys.executable.
225      command = [
226          'vpython3',
227          os.path.join(_DIR_SOURCE_ROOT, 'net', 'tools', 'testserver',
228                       'testserver.py')
229      ] + self.command_line
230      _logger.info('Running: %s', command)
231
232      # Disable PYTHONUNBUFFERED because it has a bad interaction with the
233      # testserver. Remove once this interaction is fixed.
234      unbuf = os.environ.pop('PYTHONUNBUFFERED', None)
235
236      # Pass _DIR_SOURCE_ROOT as the child's working directory so that relative
237      # paths in the arguments are resolved correctly. devnull can be replaced
238      # with subprocess.DEVNULL in Python 3.
239      with open(os.devnull, 'r+b') as devnull:
240        self.process = subprocess.Popen(
241            command,
242            preexec_fn=lambda: self._CloseUnnecessaryFDsForTestServerProcess(
243                pipe_out),
244            stdin=devnull,
245            # Preserve stdout and stderr from the test server.
246            stdout=None,
247            stderr=None,
248            cwd=_DIR_SOURCE_ROOT,
249            close_fds=False)
250
251      # Close pipe_out early. If self.process crashes, this will be visible
252      # in _WaitToStartAndGetPortFromTestServer's select loop.
253      os.close(pipe_out)
254      pipe_out = -1
255      if unbuf:
256        os.environ['PYTHONUNBUFFERED'] = unbuf
257      self.is_ready = self._WaitToStartAndGetPortFromTestServer(pipe_in)
258
259      if self.is_ready:
260        port_map = [(0, self.host_port)]
261        if self.host_ocsp_port:
262          port_map.extend([(0, self.host_ocsp_port)])
263        self.port_forwarder.Map(port_map)
264
265        self.forwarder_device_port = \
266            self.port_forwarder.GetDevicePortForHostPort(self.host_port)
267        if self.host_ocsp_port:
268          self.forwarder_ocsp_device_port = \
269              self.port_forwarder.GetDevicePortForHostPort(self.host_ocsp_port)
270
271        # Check whether the forwarder is ready on the device.
272        self.is_ready = self.forwarder_device_port and \
273            self.port_forwarder.WaitDevicePortReady(self.forwarder_device_port)
274
275      # Wake up the request handler thread.
276      self.ready_event.set()
277      # Keep thread running until Stop() gets called.
278      self.stop_event.wait()
279      if self.process.poll() is None:
280        self.process.kill()
281        # Wait for process to actually terminate.
282        # (crbug.com/946475)
283        self.process.wait()
284
285      self.port_forwarder.Unmap(self.forwarder_device_port)
286      self.process = None
287      self.is_ready = False
288    finally:
289      if pipe_in >= 0:
290        os.close(pipe_in)
291      if pipe_out >= 0:
292        os.close(pipe_out)
293    _logger.info('Test-server has died.')
294    self.wait_event.set()
295
296  def Stop(self):
297    """Blocks until the loop has finished.
298
299    Note that this must be called in another thread.
300    """
301    if not self.process:
302      return
303    self.stop_event.set()
304    self.wait_event.wait()
305
306
307class SpawningServerRequestHandler(BaseHTTPRequestHandler):
308  """A handler used to process http GET/POST request."""
309
310  def _SendResponse(self, response_code, response_reason, additional_headers,
311                    contents):
312    """Generates a response sent to the client from the provided parameters.
313
314    Args:
315      response_code: number of the response status.
316      response_reason: string of reason description of the response.
317      additional_headers: dict of additional headers. Each key is the name of
318                          the header, each value is the content of the header.
319      contents: string of the contents we want to send to client.
320    """
321    self.send_response(response_code, response_reason)
322    self.send_header('Content-Type', 'text/html')
323    # Specify the content-length as without it the http(s) response will not
324    # be completed properly (and the browser keeps expecting data).
325    self.send_header('Content-Length', len(contents))
326    for header_name in additional_headers:
327      self.send_header(header_name, additional_headers[header_name])
328    self.end_headers()
329    self.wfile.write(contents.encode('utf8'))
330    self.wfile.flush()
331
332  def _StartTestServer(self):
333    """Starts the test server thread."""
334    _logger.info('Handling request to spawn a test server.')
335    content_type = self.headers.get('content-type')
336    if content_type != 'application/json':
337      raise Exception('Bad content-type for start request.')
338    content_length = self.headers.get('content-length')
339    if not content_length:
340      content_length = 0
341    try:
342      content_length = int(content_length)
343    except:
344      raise Exception('Bad content-length for start request.')
345    _logger.info(content_length)
346    test_server_argument_json = self.rfile.read(content_length)
347    _logger.info(test_server_argument_json)
348
349    if len(self.server.test_servers) >= self.server.max_instances:
350      self._SendResponse(400, 'Invalid request', {},
351                         'Too many test servers running')
352      return
353
354    ready_event = threading.Event()
355    new_server = TestServerThread(ready_event,
356                                  json.loads(test_server_argument_json),
357                                  self.server.port_forwarder)
358    new_server.setDaemon(True)
359    new_server.start()
360    ready_event.wait()
361    if new_server.is_ready:
362      response = {'port': new_server.forwarder_device_port,
363                  'message': 'started'};
364      if new_server.forwarder_ocsp_device_port:
365        response['ocsp_port'] = new_server.forwarder_ocsp_device_port
366      self._SendResponse(200, 'OK', {}, json.dumps(response))
367      _logger.info('Test server is running on port %d forwarded to %d.' %
368              (new_server.forwarder_device_port, new_server.host_port))
369      port = new_server.forwarder_device_port
370      assert port not in self.server.test_servers
371      self.server.test_servers[port] = new_server
372    else:
373      new_server.Stop()
374      self._SendResponse(500, 'Test Server Error.', {}, '')
375      _logger.info('Encounter problem during starting a test server.')
376
377  def _KillTestServer(self, params):
378    """Stops the test server instance."""
379    try:
380      port = int(params['port'][0])
381    except ValueError:
382      port = None
383    if port == None or port <= 0:
384      self._SendResponse(400, 'Invalid request.', {}, 'port must be specified')
385      return
386
387    if port not in self.server.test_servers:
388      self._SendResponse(400, 'Invalid request.', {},
389                         "testserver isn't running on port %d" % port)
390      return
391
392    server = self.server.test_servers.pop(port)
393
394    _logger.info('Handling request to kill a test server on port: %d.', port)
395    server.Stop()
396
397    # Make sure the status of test server is correct before sending response.
398    if self.server.port_forwarder.WaitHostPortAvailable(port):
399      self._SendResponse(200, 'OK', {}, 'killed')
400      _logger.info('Test server on port %d is killed', port)
401    else:
402      # We expect the port to be free, but nothing stops the system from
403      # binding something else to that port, so don't throw error.
404      # (crbug.com/946475)
405      self._SendResponse(200, 'OK', {}, '')
406      _logger.warn('Port %s is not free after killing test server.' % port)
407
408  def log_message(self, format, *args):
409    # Suppress the default HTTP logging behavior if the logging level is higher
410    # than INFO.
411    if _logger.getEffectiveLevel() <= logging.INFO:
412      pass
413
414  def do_POST(self):
415    parsed_path = urllib.parse.urlparse(self.path)
416    action = parsed_path.path
417    _logger.info('Action for POST method is: %s.', action)
418    if action == '/start':
419      self._StartTestServer()
420    else:
421      self._SendResponse(400, 'Unknown request.', {}, '')
422      _logger.info('Encounter unknown request: %s.', action)
423
424  def do_GET(self):
425    parsed_path = urllib.parse.urlparse(self.path)
426    action = parsed_path.path
427    params = urllib.parse.parse_qs(parsed_path.query, keep_blank_values=1)
428    _logger.info('Action for GET method is: %s.', action)
429    for param in params:
430      _logger.info('%s=%s', param, params[param][0])
431    if action == '/kill':
432      self._KillTestServer(params)
433    elif action == '/ping':
434      # The ping handler is used to check whether the spawner server is ready
435      # to serve the requests. We don't need to test the status of the test
436      # server when handling ping request.
437      self._SendResponse(200, 'OK', {}, 'ready')
438      _logger.info('Handled ping request and sent response.')
439    else:
440      self._SendResponse(400, 'Unknown request', {}, '')
441      _logger.info('Encounter unknown request: %s.', action)
442
443
444class SpawningServer(object):
445  """The class used to start/stop a http server."""
446
447  def __init__(self, test_server_spawner_port, port_forwarder, max_instances):
448    self.server = HTTPServer(('', test_server_spawner_port),
449                             SpawningServerRequestHandler)
450    self.server_port = self.server.server_port
451    _logger.info('Started test server spawner on port: %d.', self.server_port)
452
453    self.server.port_forwarder = port_forwarder
454    self.server.test_servers = {}
455    self.server.max_instances = max_instances
456
457  def _Listen(self):
458    _logger.info('Starting test server spawner.')
459    self.server.serve_forever()
460
461  def Start(self):
462    """Starts the test server spawner."""
463    listener_thread = threading.Thread(target=self._Listen)
464    listener_thread.setDaemon(True)
465    listener_thread.start()
466
467  def Stop(self):
468    """Stops the test server spawner.
469
470    Also cleans the server state.
471    """
472    self.CleanupState()
473    self.server.shutdown()
474
475  def CleanupState(self):
476    """Cleans up the spawning server state.
477
478    This should be called if the test server spawner is reused,
479    to avoid sharing the test server instance.
480    """
481    if self.server.test_servers:
482      _logger.warning('Not all test servers were stopped.')
483      for port in self.server.test_servers:
484        _logger.warning('Stopping test server on port %d' % port)
485        self.server.test_servers[port].Stop()
486      self.server.test_servers = {}
487