1# Copyright 2017 The Chromium Authors 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""A "Test Server Spawner" that handles killing/stopping per-test test servers. 6 7It's used to accept requests from the device to spawn and kill instances of the 8chrome test server on the host. 9""" 10# pylint: disable=W0702 11 12import json 13import logging 14import os 15import select 16import struct 17import subprocess 18import sys 19import threading 20import time 21import urllib 22 23from http.server import BaseHTTPRequestHandler 24from http.server import HTTPServer 25 26SERVER_TYPES = { 27 'http': '', 28 'ftp': '-f', 29 'ws': '--websocket', 30} 31 32 33_DIR_SOURCE_ROOT = os.path.abspath( 34 os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, os.pardir, 35 os.pardir)) 36 37 38_logger = logging.getLogger(__name__) 39 40 41# Path that are needed to import necessary modules when launching a testserver. 42os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', '') + ( 43 ':%s:%s' % (os.path.join(_DIR_SOURCE_ROOT, 'third_party'), 44 os.path.join(_DIR_SOURCE_ROOT, 'net', 'tools', 'testserver'))) 45 46 47def _GetServerTypeCommandLine(server_type): 48 """Returns the command-line by the given server type. 49 50 Args: 51 server_type: the server type to be used (e.g. 'http'). 52 53 Returns: 54 A string containing the command-line argument. 55 """ 56 if server_type not in SERVER_TYPES: 57 raise NotImplementedError('Unknown server type: %s' % server_type) 58 return SERVER_TYPES[server_type] 59 60 61class PortForwarder: 62 def Map(self, port_pairs): 63 pass 64 65 def GetDevicePortForHostPort(self, host_port): 66 """Returns the device port that corresponds to a given host port.""" 67 return host_port 68 69 def WaitHostPortAvailable(self, port): 70 """Returns True if |port| is available.""" 71 return True 72 73 def WaitPortNotAvailable(self, port): 74 """Returns True if |port| is not available.""" 75 return True 76 77 def WaitDevicePortReady(self, port): 78 """Returns whether the provided port is used.""" 79 return True 80 81 def Unmap(self, device_port): 82 """Unmaps specified port""" 83 pass 84 85 86class TestServerThread(threading.Thread): 87 """A thread to run the test server in a separate process.""" 88 89 def __init__(self, ready_event, arguments, port_forwarder): 90 """Initialize TestServerThread with the following argument. 91 92 Args: 93 ready_event: event which will be set when the test server is ready. 94 arguments: dictionary of arguments to run the test server. 95 device: An instance of DeviceUtils. 96 tool: instance of runtime error detection tool. 97 """ 98 threading.Thread.__init__(self) 99 self.wait_event = threading.Event() 100 self.stop_event = threading.Event() 101 self.ready_event = ready_event 102 self.ready_event.clear() 103 self.arguments = arguments 104 self.port_forwarder = port_forwarder 105 self.test_server_process = None 106 self.is_ready = False 107 self.host_port = 0 108 self.host_ocsp_port = 0 109 assert isinstance(self.host_port, int) 110 # The forwarder device port now is dynamically allocated. 111 self.forwarder_device_port = 0 112 self.forwarder_ocsp_device_port = 0 113 self.process = None 114 self.command_line = [] 115 116 def _WaitToStartAndGetPortFromTestServer(self, pipe_in): 117 """Waits for the Python test server to start and gets the port it is using. 118 119 The port information is passed by the Python test server with a pipe given 120 by |pipe_in|. It is written as a result to |self.host_port|. 121 122 Returns: 123 Whether the port used by the test server was successfully fetched. 124 """ 125 (in_fds, _, _) = select.select([pipe_in], [], []) 126 if len(in_fds) == 0: 127 _logger.error('Failed to wait to the Python test server to be started.') 128 return False 129 # First read the data length as an unsigned 4-byte value. This 130 # is _not_ using network byte ordering since the Python test server packs 131 # size as native byte order and all Chromium platforms so far are 132 # configured to use little-endian. 133 # TODO(jnd): Change the Python test server and local_test_server_*.cc to 134 # use a unified byte order (either big-endian or little-endian). 135 data_length = os.read(pipe_in, struct.calcsize('=L')) 136 if data_length: 137 (data_length,) = struct.unpack('=L', data_length) 138 assert data_length 139 if not data_length: 140 _logger.error('Failed to get length of server data.') 141 return False 142 server_data_json = os.read(pipe_in, data_length) 143 if not server_data_json: 144 _logger.error('Failed to get server data.') 145 return False 146 _logger.info('Got port json data: %s', server_data_json) 147 148 parsed_server_data = None 149 try: 150 parsed_server_data = json.loads(server_data_json) 151 except ValueError: 152 pass 153 154 if not isinstance(parsed_server_data, dict): 155 _logger.error('Failed to parse server_data: %s' % server_data_json) 156 return False 157 158 if not isinstance(parsed_server_data.get('port'), int): 159 _logger.error('Failed to get port information from the server data.') 160 return False 161 162 self.host_port = parsed_server_data['port'] 163 self.host_ocsp_port = parsed_server_data.get('ocsp_port', 0) 164 165 return self.port_forwarder.WaitPortNotAvailable(self.host_port) 166 167 def _GenerateCommandLineArguments(self, pipe_out): 168 """Generates the command line to run the test server. 169 170 Note that all options are processed by following the definitions in 171 testserver.py. 172 """ 173 if self.command_line: 174 return 175 176 args_copy = dict(self.arguments) 177 178 # Translate the server type. 179 type_cmd = _GetServerTypeCommandLine(args_copy.pop('server-type')) 180 if type_cmd: 181 self.command_line.append(type_cmd) 182 183 # Use a pipe to get the port given by the Python test server. 184 self.command_line.append('--startup-pipe=%d' % pipe_out) 185 186 # Pass the remaining arguments as-is. 187 for key, values in args_copy.items(): 188 if not isinstance(values, list): 189 values = [values] 190 for value in values: 191 if value is None: 192 self.command_line.append('--%s' % key) 193 else: 194 self.command_line.append('--%s=%s' % (key, value)) 195 196 def _CloseUnnecessaryFDsForTestServerProcess(self, pipe_out): 197 # This is required to avoid subtle deadlocks that could be caused by the 198 # test server child process inheriting undesirable file descriptors such as 199 # file lock file descriptors. Note stdin, stdout, and stderr (0-2) are left 200 # alone and redirected with subprocess.Popen. It is important to leave those 201 # fds filled, or the test server will accidentally open other fds at those 202 # numbers. 203 for fd in range(3, 1024): 204 if fd != pipe_out: 205 try: 206 os.close(fd) 207 except: 208 pass 209 210 def run(self): 211 _logger.info('Start running the thread!') 212 self.wait_event.clear() 213 214 # Set up a pipe for the server to report when it has started. 215 pipe_in, pipe_out = os.pipe() 216 217 # TODO(crbug.com/40618161): Remove if condition after python3 migration. 218 if hasattr(os, 'set_inheritable'): 219 os.set_inheritable(pipe_out, True) 220 221 try: 222 self._GenerateCommandLineArguments(pipe_out) 223 # TODO(crbug.com/40618161): When this script is ported to Python 3, replace 224 # 'vpython3' below with sys.executable. 225 command = [ 226 'vpython3', 227 os.path.join(_DIR_SOURCE_ROOT, 'net', 'tools', 'testserver', 228 'testserver.py') 229 ] + self.command_line 230 _logger.info('Running: %s', command) 231 232 # Disable PYTHONUNBUFFERED because it has a bad interaction with the 233 # testserver. Remove once this interaction is fixed. 234 unbuf = os.environ.pop('PYTHONUNBUFFERED', None) 235 236 # Pass _DIR_SOURCE_ROOT as the child's working directory so that relative 237 # paths in the arguments are resolved correctly. devnull can be replaced 238 # with subprocess.DEVNULL in Python 3. 239 with open(os.devnull, 'r+b') as devnull: 240 self.process = subprocess.Popen( 241 command, 242 preexec_fn=lambda: self._CloseUnnecessaryFDsForTestServerProcess( 243 pipe_out), 244 stdin=devnull, 245 # Preserve stdout and stderr from the test server. 246 stdout=None, 247 stderr=None, 248 cwd=_DIR_SOURCE_ROOT, 249 close_fds=False) 250 251 # Close pipe_out early. If self.process crashes, this will be visible 252 # in _WaitToStartAndGetPortFromTestServer's select loop. 253 os.close(pipe_out) 254 pipe_out = -1 255 if unbuf: 256 os.environ['PYTHONUNBUFFERED'] = unbuf 257 self.is_ready = self._WaitToStartAndGetPortFromTestServer(pipe_in) 258 259 if self.is_ready: 260 port_map = [(0, self.host_port)] 261 if self.host_ocsp_port: 262 port_map.extend([(0, self.host_ocsp_port)]) 263 self.port_forwarder.Map(port_map) 264 265 self.forwarder_device_port = \ 266 self.port_forwarder.GetDevicePortForHostPort(self.host_port) 267 if self.host_ocsp_port: 268 self.forwarder_ocsp_device_port = \ 269 self.port_forwarder.GetDevicePortForHostPort(self.host_ocsp_port) 270 271 # Check whether the forwarder is ready on the device. 272 self.is_ready = self.forwarder_device_port and \ 273 self.port_forwarder.WaitDevicePortReady(self.forwarder_device_port) 274 275 # Wake up the request handler thread. 276 self.ready_event.set() 277 # Keep thread running until Stop() gets called. 278 self.stop_event.wait() 279 if self.process.poll() is None: 280 self.process.kill() 281 # Wait for process to actually terminate. 282 # (crbug.com/946475) 283 self.process.wait() 284 285 self.port_forwarder.Unmap(self.forwarder_device_port) 286 self.process = None 287 self.is_ready = False 288 finally: 289 if pipe_in >= 0: 290 os.close(pipe_in) 291 if pipe_out >= 0: 292 os.close(pipe_out) 293 _logger.info('Test-server has died.') 294 self.wait_event.set() 295 296 def Stop(self): 297 """Blocks until the loop has finished. 298 299 Note that this must be called in another thread. 300 """ 301 if not self.process: 302 return 303 self.stop_event.set() 304 self.wait_event.wait() 305 306 307class SpawningServerRequestHandler(BaseHTTPRequestHandler): 308 """A handler used to process http GET/POST request.""" 309 310 def _SendResponse(self, response_code, response_reason, additional_headers, 311 contents): 312 """Generates a response sent to the client from the provided parameters. 313 314 Args: 315 response_code: number of the response status. 316 response_reason: string of reason description of the response. 317 additional_headers: dict of additional headers. Each key is the name of 318 the header, each value is the content of the header. 319 contents: string of the contents we want to send to client. 320 """ 321 self.send_response(response_code, response_reason) 322 self.send_header('Content-Type', 'text/html') 323 # Specify the content-length as without it the http(s) response will not 324 # be completed properly (and the browser keeps expecting data). 325 self.send_header('Content-Length', len(contents)) 326 for header_name in additional_headers: 327 self.send_header(header_name, additional_headers[header_name]) 328 self.end_headers() 329 self.wfile.write(contents.encode('utf8')) 330 self.wfile.flush() 331 332 def _StartTestServer(self): 333 """Starts the test server thread.""" 334 _logger.info('Handling request to spawn a test server.') 335 content_type = self.headers.get('content-type') 336 if content_type != 'application/json': 337 raise Exception('Bad content-type for start request.') 338 content_length = self.headers.get('content-length') 339 if not content_length: 340 content_length = 0 341 try: 342 content_length = int(content_length) 343 except: 344 raise Exception('Bad content-length for start request.') 345 _logger.info(content_length) 346 test_server_argument_json = self.rfile.read(content_length) 347 _logger.info(test_server_argument_json) 348 349 if len(self.server.test_servers) >= self.server.max_instances: 350 self._SendResponse(400, 'Invalid request', {}, 351 'Too many test servers running') 352 return 353 354 ready_event = threading.Event() 355 new_server = TestServerThread(ready_event, 356 json.loads(test_server_argument_json), 357 self.server.port_forwarder) 358 new_server.setDaemon(True) 359 new_server.start() 360 ready_event.wait() 361 if new_server.is_ready: 362 response = {'port': new_server.forwarder_device_port, 363 'message': 'started'}; 364 if new_server.forwarder_ocsp_device_port: 365 response['ocsp_port'] = new_server.forwarder_ocsp_device_port 366 self._SendResponse(200, 'OK', {}, json.dumps(response)) 367 _logger.info('Test server is running on port %d forwarded to %d.' % 368 (new_server.forwarder_device_port, new_server.host_port)) 369 port = new_server.forwarder_device_port 370 assert port not in self.server.test_servers 371 self.server.test_servers[port] = new_server 372 else: 373 new_server.Stop() 374 self._SendResponse(500, 'Test Server Error.', {}, '') 375 _logger.info('Encounter problem during starting a test server.') 376 377 def _KillTestServer(self, params): 378 """Stops the test server instance.""" 379 try: 380 port = int(params['port'][0]) 381 except ValueError: 382 port = None 383 if port == None or port <= 0: 384 self._SendResponse(400, 'Invalid request.', {}, 'port must be specified') 385 return 386 387 if port not in self.server.test_servers: 388 self._SendResponse(400, 'Invalid request.', {}, 389 "testserver isn't running on port %d" % port) 390 return 391 392 server = self.server.test_servers.pop(port) 393 394 _logger.info('Handling request to kill a test server on port: %d.', port) 395 server.Stop() 396 397 # Make sure the status of test server is correct before sending response. 398 if self.server.port_forwarder.WaitHostPortAvailable(port): 399 self._SendResponse(200, 'OK', {}, 'killed') 400 _logger.info('Test server on port %d is killed', port) 401 else: 402 # We expect the port to be free, but nothing stops the system from 403 # binding something else to that port, so don't throw error. 404 # (crbug.com/946475) 405 self._SendResponse(200, 'OK', {}, '') 406 _logger.warn('Port %s is not free after killing test server.' % port) 407 408 def log_message(self, format, *args): 409 # Suppress the default HTTP logging behavior if the logging level is higher 410 # than INFO. 411 if _logger.getEffectiveLevel() <= logging.INFO: 412 pass 413 414 def do_POST(self): 415 parsed_path = urllib.parse.urlparse(self.path) 416 action = parsed_path.path 417 _logger.info('Action for POST method is: %s.', action) 418 if action == '/start': 419 self._StartTestServer() 420 else: 421 self._SendResponse(400, 'Unknown request.', {}, '') 422 _logger.info('Encounter unknown request: %s.', action) 423 424 def do_GET(self): 425 parsed_path = urllib.parse.urlparse(self.path) 426 action = parsed_path.path 427 params = urllib.parse.parse_qs(parsed_path.query, keep_blank_values=1) 428 _logger.info('Action for GET method is: %s.', action) 429 for param in params: 430 _logger.info('%s=%s', param, params[param][0]) 431 if action == '/kill': 432 self._KillTestServer(params) 433 elif action == '/ping': 434 # The ping handler is used to check whether the spawner server is ready 435 # to serve the requests. We don't need to test the status of the test 436 # server when handling ping request. 437 self._SendResponse(200, 'OK', {}, 'ready') 438 _logger.info('Handled ping request and sent response.') 439 else: 440 self._SendResponse(400, 'Unknown request', {}, '') 441 _logger.info('Encounter unknown request: %s.', action) 442 443 444class SpawningServer(object): 445 """The class used to start/stop a http server.""" 446 447 def __init__(self, test_server_spawner_port, port_forwarder, max_instances): 448 self.server = HTTPServer(('', test_server_spawner_port), 449 SpawningServerRequestHandler) 450 self.server_port = self.server.server_port 451 _logger.info('Started test server spawner on port: %d.', self.server_port) 452 453 self.server.port_forwarder = port_forwarder 454 self.server.test_servers = {} 455 self.server.max_instances = max_instances 456 457 def _Listen(self): 458 _logger.info('Starting test server spawner.') 459 self.server.serve_forever() 460 461 def Start(self): 462 """Starts the test server spawner.""" 463 listener_thread = threading.Thread(target=self._Listen) 464 listener_thread.setDaemon(True) 465 listener_thread.start() 466 467 def Stop(self): 468 """Stops the test server spawner. 469 470 Also cleans the server state. 471 """ 472 self.CleanupState() 473 self.server.shutdown() 474 475 def CleanupState(self): 476 """Cleans up the spawning server state. 477 478 This should be called if the test server spawner is reused, 479 to avoid sharing the test server instance. 480 """ 481 if self.server.test_servers: 482 _logger.warning('Not all test servers were stopped.') 483 for port in self.server.test_servers: 484 _logger.warning('Stopping test server on port %d' % port) 485 self.server.test_servers[port].Stop() 486 self.server.test_servers = {} 487