1# Copyright 2013 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""A "Test Server Spawner" that handles killing/stopping per-test test servers. 6 7It's used to accept requests from the device to spawn and kill instances of the 8chrome test server on the host. 9""" 10# pylint: disable=W0702 11 12import BaseHTTPServer 13import json 14import logging 15import os 16import select 17import struct 18import subprocess 19import sys 20import threading 21import time 22import urlparse 23 24from devil.android import forwarder 25from devil.android import ports 26 27from pylib import constants 28from pylib.constants import host_paths 29 30 31# Path that are needed to import necessary modules when launching a testserver. 32os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', '') + (':%s:%s:%s:%s:%s' 33 % (os.path.join(host_paths.DIR_SOURCE_ROOT, 'third_party'), 34 os.path.join(host_paths.DIR_SOURCE_ROOT, 'third_party', 'tlslite'), 35 os.path.join(host_paths.DIR_SOURCE_ROOT, 'third_party', 'pyftpdlib', 36 'src'), 37 os.path.join(host_paths.DIR_SOURCE_ROOT, 'net', 'tools', 'testserver'), 38 os.path.join(host_paths.DIR_SOURCE_ROOT, 'sync', 'tools', 'testserver'))) 39 40 41SERVER_TYPES = { 42 'http': '', 43 'ftp': '-f', 44 'sync': '', # Sync uses its own script, and doesn't take a server type arg. 45 'tcpecho': '--tcp-echo', 46 'udpecho': '--udp-echo', 47} 48 49 50# The timeout (in seconds) of starting up the Python test server. 51TEST_SERVER_STARTUP_TIMEOUT = 10 52 53def _WaitUntil(predicate, max_attempts=5): 54 """Blocks until the provided predicate (function) is true. 55 56 Returns: 57 Whether the provided predicate was satisfied once (before the timeout). 58 """ 59 sleep_time_sec = 0.025 60 for _ in xrange(1, max_attempts): 61 if predicate(): 62 return True 63 time.sleep(sleep_time_sec) 64 sleep_time_sec = min(1, sleep_time_sec * 2) # Don't wait more than 1 sec. 65 return False 66 67 68def _CheckPortAvailable(port): 69 """Returns True if |port| is available.""" 70 return _WaitUntil(lambda: ports.IsHostPortAvailable(port)) 71 72 73def _CheckPortNotAvailable(port): 74 """Returns True if |port| is not available.""" 75 return _WaitUntil(lambda: not ports.IsHostPortAvailable(port)) 76 77 78def _CheckDevicePortStatus(device, port): 79 """Returns whether the provided port is used.""" 80 return _WaitUntil(lambda: ports.IsDevicePortUsed(device, port)) 81 82 83def _GetServerTypeCommandLine(server_type): 84 """Returns the command-line by the given server type. 85 86 Args: 87 server_type: the server type to be used (e.g. 'http'). 88 89 Returns: 90 A string containing the command-line argument. 91 """ 92 if server_type not in SERVER_TYPES: 93 raise NotImplementedError('Unknown server type: %s' % server_type) 94 if server_type == 'udpecho': 95 raise Exception('Please do not run UDP echo tests because we do not have ' 96 'a UDP forwarder tool.') 97 return SERVER_TYPES[server_type] 98 99 100class TestServerThread(threading.Thread): 101 """A thread to run the test server in a separate process.""" 102 103 def __init__(self, ready_event, arguments, device, tool): 104 """Initialize TestServerThread with the following argument. 105 106 Args: 107 ready_event: event which will be set when the test server is ready. 108 arguments: dictionary of arguments to run the test server. 109 device: An instance of DeviceUtils. 110 tool: instance of runtime error detection tool. 111 """ 112 threading.Thread.__init__(self) 113 self.wait_event = threading.Event() 114 self.stop_flag = False 115 self.ready_event = ready_event 116 self.ready_event.clear() 117 self.arguments = arguments 118 self.device = device 119 self.tool = tool 120 self.test_server_process = None 121 self.is_ready = False 122 self.host_port = self.arguments['port'] 123 assert isinstance(self.host_port, int) 124 # The forwarder device port now is dynamically allocated. 125 self.forwarder_device_port = 0 126 # Anonymous pipe in order to get port info from test server. 127 self.pipe_in = None 128 self.pipe_out = None 129 self.process = None 130 self.command_line = [] 131 132 def _WaitToStartAndGetPortFromTestServer(self): 133 """Waits for the Python test server to start and gets the port it is using. 134 135 The port information is passed by the Python test server with a pipe given 136 by self.pipe_out. It is written as a result to |self.host_port|. 137 138 Returns: 139 Whether the port used by the test server was successfully fetched. 140 """ 141 assert self.host_port == 0 and self.pipe_out and self.pipe_in 142 (in_fds, _, _) = select.select([self.pipe_in, ], [], [], 143 TEST_SERVER_STARTUP_TIMEOUT) 144 if len(in_fds) == 0: 145 logging.error('Failed to wait to the Python test server to be started.') 146 return False 147 # First read the data length as an unsigned 4-byte value. This 148 # is _not_ using network byte ordering since the Python test server packs 149 # size as native byte order and all Chromium platforms so far are 150 # configured to use little-endian. 151 # TODO(jnd): Change the Python test server and local_test_server_*.cc to 152 # use a unified byte order (either big-endian or little-endian). 153 data_length = os.read(self.pipe_in, struct.calcsize('=L')) 154 if data_length: 155 (data_length,) = struct.unpack('=L', data_length) 156 assert data_length 157 if not data_length: 158 logging.error('Failed to get length of server data.') 159 return False 160 port_json = os.read(self.pipe_in, data_length) 161 if not port_json: 162 logging.error('Failed to get server data.') 163 return False 164 logging.info('Got port json data: %s', port_json) 165 port_json = json.loads(port_json) 166 if port_json.has_key('port') and isinstance(port_json['port'], int): 167 self.host_port = port_json['port'] 168 return _CheckPortNotAvailable(self.host_port) 169 logging.error('Failed to get port information from the server data.') 170 return False 171 172 def _GenerateCommandLineArguments(self): 173 """Generates the command line to run the test server. 174 175 Note that all options are processed by following the definitions in 176 testserver.py. 177 """ 178 if self.command_line: 179 return 180 181 args_copy = dict(self.arguments) 182 183 # Translate the server type. 184 type_cmd = _GetServerTypeCommandLine(args_copy.pop('server-type')) 185 if type_cmd: 186 self.command_line.append(type_cmd) 187 188 # Use a pipe to get the port given by the instance of Python test server 189 # if the test does not specify the port. 190 assert self.host_port == args_copy['port'] 191 if self.host_port == 0: 192 (self.pipe_in, self.pipe_out) = os.pipe() 193 self.command_line.append('--startup-pipe=%d' % self.pipe_out) 194 195 # Pass the remaining arguments as-is. 196 for key, values in args_copy.iteritems(): 197 if not isinstance(values, list): 198 values = [values] 199 for value in values: 200 if value is None: 201 self.command_line.append('--%s' % key) 202 else: 203 self.command_line.append('--%s=%s' % (key, value)) 204 205 def _CloseUnnecessaryFDsForTestServerProcess(self): 206 # This is required to avoid subtle deadlocks that could be caused by the 207 # test server child process inheriting undesirable file descriptors such as 208 # file lock file descriptors. 209 for fd in xrange(0, 1024): 210 if fd != self.pipe_out: 211 try: 212 os.close(fd) 213 except: 214 pass 215 216 def run(self): 217 logging.info('Start running the thread!') 218 self.wait_event.clear() 219 self._GenerateCommandLineArguments() 220 command = host_paths.DIR_SOURCE_ROOT 221 if self.arguments['server-type'] == 'sync': 222 command = [os.path.join(command, 'sync', 'tools', 'testserver', 223 'sync_testserver.py')] + self.command_line 224 else: 225 command = [os.path.join(command, 'net', 'tools', 'testserver', 226 'testserver.py')] + self.command_line 227 logging.info('Running: %s', command) 228 229 # Disable PYTHONUNBUFFERED because it has a bad interaction with the 230 # testserver. Remove once this interaction is fixed. 231 unbuf = os.environ.pop('PYTHONUNBUFFERED', None) 232 233 # Pass DIR_SOURCE_ROOT as the child's working directory so that relative 234 # paths in the arguments are resolved correctly. 235 self.process = subprocess.Popen( 236 command, preexec_fn=self._CloseUnnecessaryFDsForTestServerProcess, 237 cwd=host_paths.DIR_SOURCE_ROOT) 238 if unbuf: 239 os.environ['PYTHONUNBUFFERED'] = unbuf 240 if self.process: 241 if self.pipe_out: 242 self.is_ready = self._WaitToStartAndGetPortFromTestServer() 243 else: 244 self.is_ready = _CheckPortNotAvailable(self.host_port) 245 if self.is_ready: 246 forwarder.Forwarder.Map([(0, self.host_port)], self.device, self.tool) 247 # Check whether the forwarder is ready on the device. 248 self.is_ready = False 249 device_port = forwarder.Forwarder.DevicePortForHostPort(self.host_port) 250 if device_port and _CheckDevicePortStatus(self.device, device_port): 251 self.is_ready = True 252 self.forwarder_device_port = device_port 253 # Wake up the request handler thread. 254 self.ready_event.set() 255 # Keep thread running until Stop() gets called. 256 _WaitUntil(lambda: self.stop_flag, max_attempts=sys.maxint) 257 if self.process.poll() is None: 258 self.process.kill() 259 forwarder.Forwarder.UnmapDevicePort(self.forwarder_device_port, self.device) 260 self.process = None 261 self.is_ready = False 262 if self.pipe_out: 263 os.close(self.pipe_in) 264 os.close(self.pipe_out) 265 self.pipe_in = None 266 self.pipe_out = None 267 logging.info('Test-server has died.') 268 self.wait_event.set() 269 270 def Stop(self): 271 """Blocks until the loop has finished. 272 273 Note that this must be called in another thread. 274 """ 275 if not self.process: 276 return 277 self.stop_flag = True 278 self.wait_event.wait() 279 280 281class SpawningServerRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): 282 """A handler used to process http GET/POST request.""" 283 284 def _SendResponse(self, response_code, response_reason, additional_headers, 285 contents): 286 """Generates a response sent to the client from the provided parameters. 287 288 Args: 289 response_code: number of the response status. 290 response_reason: string of reason description of the response. 291 additional_headers: dict of additional headers. Each key is the name of 292 the header, each value is the content of the header. 293 contents: string of the contents we want to send to client. 294 """ 295 self.send_response(response_code, response_reason) 296 self.send_header('Content-Type', 'text/html') 297 # Specify the content-length as without it the http(s) response will not 298 # be completed properly (and the browser keeps expecting data). 299 self.send_header('Content-Length', len(contents)) 300 for header_name in additional_headers: 301 self.send_header(header_name, additional_headers[header_name]) 302 self.end_headers() 303 self.wfile.write(contents) 304 self.wfile.flush() 305 306 def _StartTestServer(self): 307 """Starts the test server thread.""" 308 logging.info('Handling request to spawn a test server.') 309 content_type = self.headers.getheader('content-type') 310 if content_type != 'application/json': 311 raise Exception('Bad content-type for start request.') 312 content_length = self.headers.getheader('content-length') 313 if not content_length: 314 content_length = 0 315 try: 316 content_length = int(content_length) 317 except: 318 raise Exception('Bad content-length for start request.') 319 logging.info(content_length) 320 test_server_argument_json = self.rfile.read(content_length) 321 logging.info(test_server_argument_json) 322 assert not self.server.test_server_instance 323 ready_event = threading.Event() 324 self.server.test_server_instance = TestServerThread( 325 ready_event, 326 json.loads(test_server_argument_json), 327 self.server.device, 328 self.server.tool) 329 self.server.test_server_instance.setDaemon(True) 330 self.server.test_server_instance.start() 331 ready_event.wait() 332 if self.server.test_server_instance.is_ready: 333 self._SendResponse(200, 'OK', {}, json.dumps( 334 {'port': self.server.test_server_instance.forwarder_device_port, 335 'message': 'started'})) 336 logging.info('Test server is running on port: %d.', 337 self.server.test_server_instance.host_port) 338 else: 339 self.server.test_server_instance.Stop() 340 self.server.test_server_instance = None 341 self._SendResponse(500, 'Test Server Error.', {}, '') 342 logging.info('Encounter problem during starting a test server.') 343 344 def _KillTestServer(self): 345 """Stops the test server instance.""" 346 # There should only ever be one test server at a time. This may do the 347 # wrong thing if we try and start multiple test servers. 348 if not self.server.test_server_instance: 349 return 350 port = self.server.test_server_instance.host_port 351 logging.info('Handling request to kill a test server on port: %d.', port) 352 self.server.test_server_instance.Stop() 353 # Make sure the status of test server is correct before sending response. 354 if _CheckPortAvailable(port): 355 self._SendResponse(200, 'OK', {}, 'killed') 356 logging.info('Test server on port %d is killed', port) 357 else: 358 self._SendResponse(500, 'Test Server Error.', {}, '') 359 logging.info('Encounter problem during killing a test server.') 360 self.server.test_server_instance = None 361 362 def do_POST(self): 363 parsed_path = urlparse.urlparse(self.path) 364 action = parsed_path.path 365 logging.info('Action for POST method is: %s.', action) 366 if action == '/start': 367 self._StartTestServer() 368 else: 369 self._SendResponse(400, 'Unknown request.', {}, '') 370 logging.info('Encounter unknown request: %s.', action) 371 372 def do_GET(self): 373 parsed_path = urlparse.urlparse(self.path) 374 action = parsed_path.path 375 params = urlparse.parse_qs(parsed_path.query, keep_blank_values=1) 376 logging.info('Action for GET method is: %s.', action) 377 for param in params: 378 logging.info('%s=%s', param, params[param][0]) 379 if action == '/kill': 380 self._KillTestServer() 381 elif action == '/ping': 382 # The ping handler is used to check whether the spawner server is ready 383 # to serve the requests. We don't need to test the status of the test 384 # server when handling ping request. 385 self._SendResponse(200, 'OK', {}, 'ready') 386 logging.info('Handled ping request and sent response.') 387 else: 388 self._SendResponse(400, 'Unknown request', {}, '') 389 logging.info('Encounter unknown request: %s.', action) 390 391 392class SpawningServer(object): 393 """The class used to start/stop a http server.""" 394 395 def __init__(self, test_server_spawner_port, device, tool): 396 logging.info('Creating new spawner on port: %d.', test_server_spawner_port) 397 self.server = BaseHTTPServer.HTTPServer(('', test_server_spawner_port), 398 SpawningServerRequestHandler) 399 self.server.device = device 400 self.server.tool = tool 401 self.server.test_server_instance = None 402 self.server.build_type = constants.GetBuildType() 403 404 def _Listen(self): 405 logging.info('Starting test server spawner') 406 self.server.serve_forever() 407 408 def Start(self): 409 """Starts the test server spawner.""" 410 listener_thread = threading.Thread(target=self._Listen) 411 listener_thread.setDaemon(True) 412 listener_thread.start() 413 414 def Stop(self): 415 """Stops the test server spawner. 416 417 Also cleans the server state. 418 """ 419 self.CleanupState() 420 self.server.shutdown() 421 422 def CleanupState(self): 423 """Cleans up the spawning server state. 424 425 This should be called if the test server spawner is reused, 426 to avoid sharing the test server instance. 427 """ 428 if self.server.test_server_instance: 429 self.server.test_server_instance.Stop() 430 self.server.test_server_instance = None 431