1# Copyright 2013 The Chromium Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""A "Test Server Spawner" that handles killing/stopping per-test test servers. 6 7It's used to accept requests from the device to spawn and kill instances of the 8chrome test server on the host. 9""" 10 11import BaseHTTPServer 12import json 13import logging 14import os 15import select 16import struct 17import subprocess 18import sys 19import threading 20import time 21import urlparse 22 23import constants 24import ports 25 26from pylib.forwarder import Forwarder 27 28# Path that are needed to import necessary modules when launching a testserver. 29os.environ['PYTHONPATH'] = os.environ.get('PYTHONPATH', '') + (':%s:%s:%s:%s:%s' 30 % (os.path.join(constants.DIR_SOURCE_ROOT, 'third_party'), 31 os.path.join(constants.DIR_SOURCE_ROOT, 'third_party', 'tlslite'), 32 os.path.join(constants.DIR_SOURCE_ROOT, 'third_party', 'pyftpdlib', 33 'src'), 34 os.path.join(constants.DIR_SOURCE_ROOT, 'net', 'tools', 'testserver'), 35 os.path.join(constants.DIR_SOURCE_ROOT, 'sync', 'tools', 'testserver'))) 36 37 38SERVER_TYPES = { 39 'http': '', 40 'ftp': '-f', 41 'sync': '', # Sync uses its own script, and doesn't take a server type arg. 42 'tcpecho': '--tcp-echo', 43 'udpecho': '--udp-echo', 44} 45 46 47# The timeout (in seconds) of starting up the Python test server. 48TEST_SERVER_STARTUP_TIMEOUT = 10 49 50def _WaitUntil(predicate, max_attempts=5): 51 """Blocks until the provided predicate (function) is true. 52 53 Returns: 54 Whether the provided predicate was satisfied once (before the timeout). 55 """ 56 sleep_time_sec = 0.025 57 for attempt in xrange(1, max_attempts): 58 if predicate(): 59 return True 60 time.sleep(sleep_time_sec) 61 sleep_time_sec = min(1, sleep_time_sec * 2) # Don't wait more than 1 sec. 62 return False 63 64 65def _CheckPortStatus(port, expected_status): 66 """Returns True if port has expected_status. 67 68 Args: 69 port: the port number. 70 expected_status: boolean of expected status. 71 72 Returns: 73 Returns True if the status is expected. Otherwise returns False. 74 """ 75 return _WaitUntil(lambda: ports.IsHostPortUsed(port) == expected_status) 76 77 78def _CheckDevicePortStatus(adb, port): 79 """Returns whether the provided port is used.""" 80 return _WaitUntil(lambda: ports.IsDevicePortUsed(adb, port)) 81 82 83def _GetServerTypeCommandLine(server_type): 84 """Returns the command-line by the given server type. 85 86 Args: 87 server_type: the server type to be used (e.g. 'http'). 88 89 Returns: 90 A string containing the command-line argument. 91 """ 92 if server_type not in SERVER_TYPES: 93 raise NotImplementedError('Unknown server type: %s' % server_type) 94 if server_type == 'udpecho': 95 raise Exception('Please do not run UDP echo tests because we do not have ' 96 'a UDP forwarder tool.') 97 return SERVER_TYPES[server_type] 98 99 100class TestServerThread(threading.Thread): 101 """A thread to run the test server in a separate process.""" 102 103 def __init__(self, ready_event, arguments, adb, tool): 104 """Initialize TestServerThread with the following argument. 105 106 Args: 107 ready_event: event which will be set when the test server is ready. 108 arguments: dictionary of arguments to run the test server. 109 adb: instance of AndroidCommands. 110 tool: instance of runtime error detection tool. 111 """ 112 threading.Thread.__init__(self) 113 self.wait_event = threading.Event() 114 self.stop_flag = False 115 self.ready_event = ready_event 116 self.ready_event.clear() 117 self.arguments = arguments 118 self.adb = adb 119 self.tool = tool 120 self.test_server_process = None 121 self.is_ready = False 122 self.host_port = self.arguments['port'] 123 assert isinstance(self.host_port, int) 124 # The forwarder device port now is dynamically allocated. 125 self.forwarder_device_port = 0 126 # Anonymous pipe in order to get port info from test server. 127 self.pipe_in = None 128 self.pipe_out = None 129 self.command_line = [] 130 131 def _WaitToStartAndGetPortFromTestServer(self): 132 """Waits for the Python test server to start and gets the port it is using. 133 134 The port information is passed by the Python test server with a pipe given 135 by self.pipe_out. It is written as a result to |self.host_port|. 136 137 Returns: 138 Whether the port used by the test server was successfully fetched. 139 """ 140 assert self.host_port == 0 and self.pipe_out and self.pipe_in 141 (in_fds, _, _) = select.select([self.pipe_in, ], [], [], 142 TEST_SERVER_STARTUP_TIMEOUT) 143 if len(in_fds) == 0: 144 logging.error('Failed to wait to the Python test server to be started.') 145 return False 146 # First read the data length as an unsigned 4-byte value. This 147 # is _not_ using network byte ordering since the Python test server packs 148 # size as native byte order and all Chromium platforms so far are 149 # configured to use little-endian. 150 # TODO(jnd): Change the Python test server and local_test_server_*.cc to 151 # use a unified byte order (either big-endian or little-endian). 152 data_length = os.read(self.pipe_in, struct.calcsize('=L')) 153 if data_length: 154 (data_length,) = struct.unpack('=L', data_length) 155 assert data_length 156 if not data_length: 157 logging.error('Failed to get length of server data.') 158 return False 159 port_json = os.read(self.pipe_in, data_length) 160 if not port_json: 161 logging.error('Failed to get server data.') 162 return False 163 logging.info('Got port json data: %s', port_json) 164 port_json = json.loads(port_json) 165 if port_json.has_key('port') and isinstance(port_json['port'], int): 166 self.host_port = port_json['port'] 167 return _CheckPortStatus(self.host_port, True) 168 logging.error('Failed to get port information from the server data.') 169 return False 170 171 def _GenerateCommandLineArguments(self): 172 """Generates the command line to run the test server. 173 174 Note that all options are processed by following the definitions in 175 testserver.py. 176 """ 177 if self.command_line: 178 return 179 # The following arguments must exist. 180 type_cmd = _GetServerTypeCommandLine(self.arguments['server-type']) 181 if type_cmd: 182 self.command_line.append(type_cmd) 183 self.command_line.append('--port=%d' % self.host_port) 184 # Use a pipe to get the port given by the instance of Python test server 185 # if the test does not specify the port. 186 if self.host_port == 0: 187 (self.pipe_in, self.pipe_out) = os.pipe() 188 self.command_line.append('--startup-pipe=%d' % self.pipe_out) 189 self.command_line.append('--host=%s' % self.arguments['host']) 190 data_dir = self.arguments['data-dir'] or 'chrome/test/data' 191 if not os.path.isabs(data_dir): 192 data_dir = os.path.join(constants.DIR_SOURCE_ROOT, data_dir) 193 self.command_line.append('--data-dir=%s' % data_dir) 194 # The following arguments are optional depending on the individual test. 195 if self.arguments.has_key('log-to-console'): 196 self.command_line.append('--log-to-console') 197 if self.arguments.has_key('auth-token'): 198 self.command_line.append('--auth-token=%s' % self.arguments['auth-token']) 199 if self.arguments.has_key('https'): 200 self.command_line.append('--https') 201 if self.arguments.has_key('cert-and-key-file'): 202 self.command_line.append('--cert-and-key-file=%s' % os.path.join( 203 constants.DIR_SOURCE_ROOT, self.arguments['cert-and-key-file'])) 204 if self.arguments.has_key('ocsp'): 205 self.command_line.append('--ocsp=%s' % self.arguments['ocsp']) 206 if self.arguments.has_key('https-record-resume'): 207 self.command_line.append('--https-record-resume') 208 if self.arguments.has_key('ssl-client-auth'): 209 self.command_line.append('--ssl-client-auth') 210 if self.arguments.has_key('tls-intolerant'): 211 self.command_line.append('--tls-intolerant=%s' % 212 self.arguments['tls-intolerant']) 213 if self.arguments.has_key('ssl-client-ca'): 214 for ca in self.arguments['ssl-client-ca']: 215 self.command_line.append('--ssl-client-ca=%s' % 216 os.path.join(constants.DIR_SOURCE_ROOT, ca)) 217 if self.arguments.has_key('ssl-bulk-cipher'): 218 for bulk_cipher in self.arguments['ssl-bulk-cipher']: 219 self.command_line.append('--ssl-bulk-cipher=%s' % bulk_cipher) 220 221 def _CloseUnnecessaryFDsForTestServerProcess(self): 222 # This is required to avoid subtle deadlocks that could be caused by the 223 # test server child process inheriting undesirable file descriptors such as 224 # file lock file descriptors. 225 for fd in xrange(0, 1024): 226 if fd != self.pipe_out: 227 try: 228 os.close(fd) 229 except: 230 pass 231 232 def run(self): 233 logging.info('Start running the thread!') 234 self.wait_event.clear() 235 self._GenerateCommandLineArguments() 236 command = constants.DIR_SOURCE_ROOT 237 if self.arguments['server-type'] == 'sync': 238 command = [os.path.join(command, 'sync', 'tools', 'testserver', 239 'sync_testserver.py')] + self.command_line 240 else: 241 command = [os.path.join(command, 'net', 'tools', 'testserver', 242 'testserver.py')] + self.command_line 243 logging.info('Running: %s', command) 244 self.process = subprocess.Popen( 245 command, preexec_fn=self._CloseUnnecessaryFDsForTestServerProcess) 246 if self.process: 247 if self.pipe_out: 248 self.is_ready = self._WaitToStartAndGetPortFromTestServer() 249 else: 250 self.is_ready = _CheckPortStatus(self.host_port, True) 251 if self.is_ready: 252 Forwarder.Map([(0, self.host_port)], self.adb, self.tool) 253 # Check whether the forwarder is ready on the device. 254 self.is_ready = False 255 device_port = Forwarder.DevicePortForHostPort(self.host_port) 256 if device_port and _CheckDevicePortStatus(self.adb, device_port): 257 self.is_ready = True 258 self.forwarder_device_port = device_port 259 # Wake up the request handler thread. 260 self.ready_event.set() 261 # Keep thread running until Stop() gets called. 262 _WaitUntil(lambda: self.stop_flag, max_attempts=sys.maxint) 263 if self.process.poll() is None: 264 self.process.kill() 265 Forwarder.UnmapDevicePort(self.forwarder_device_port, self.adb) 266 self.process = None 267 self.is_ready = False 268 if self.pipe_out: 269 os.close(self.pipe_in) 270 os.close(self.pipe_out) 271 self.pipe_in = None 272 self.pipe_out = None 273 logging.info('Test-server has died.') 274 self.wait_event.set() 275 276 def Stop(self): 277 """Blocks until the loop has finished. 278 279 Note that this must be called in another thread. 280 """ 281 if not self.process: 282 return 283 self.stop_flag = True 284 self.wait_event.wait() 285 286 287class SpawningServerRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): 288 """A handler used to process http GET/POST request.""" 289 290 def _SendResponse(self, response_code, response_reason, additional_headers, 291 contents): 292 """Generates a response sent to the client from the provided parameters. 293 294 Args: 295 response_code: number of the response status. 296 response_reason: string of reason description of the response. 297 additional_headers: dict of additional headers. Each key is the name of 298 the header, each value is the content of the header. 299 contents: string of the contents we want to send to client. 300 """ 301 self.send_response(response_code, response_reason) 302 self.send_header('Content-Type', 'text/html') 303 # Specify the content-length as without it the http(s) response will not 304 # be completed properly (and the browser keeps expecting data). 305 self.send_header('Content-Length', len(contents)) 306 for header_name in additional_headers: 307 self.send_header(header_name, additional_headers[header_name]) 308 self.end_headers() 309 self.wfile.write(contents) 310 self.wfile.flush() 311 312 def _StartTestServer(self): 313 """Starts the test server thread.""" 314 logging.info('Handling request to spawn a test server.') 315 content_type = self.headers.getheader('content-type') 316 if content_type != 'application/json': 317 raise Exception('Bad content-type for start request.') 318 content_length = self.headers.getheader('content-length') 319 if not content_length: 320 content_length = 0 321 try: 322 content_length = int(content_length) 323 except: 324 raise Exception('Bad content-length for start request.') 325 logging.info(content_length) 326 test_server_argument_json = self.rfile.read(content_length) 327 logging.info(test_server_argument_json) 328 assert not self.server.test_server_instance 329 ready_event = threading.Event() 330 self.server.test_server_instance = TestServerThread( 331 ready_event, 332 json.loads(test_server_argument_json), 333 self.server.adb, 334 self.server.tool) 335 self.server.test_server_instance.setDaemon(True) 336 self.server.test_server_instance.start() 337 ready_event.wait() 338 if self.server.test_server_instance.is_ready: 339 self._SendResponse(200, 'OK', {}, json.dumps( 340 {'port': self.server.test_server_instance.forwarder_device_port, 341 'message': 'started'})) 342 logging.info('Test server is running on port: %d.', 343 self.server.test_server_instance.host_port) 344 else: 345 self.server.test_server_instance.Stop() 346 self.server.test_server_instance = None 347 self._SendResponse(500, 'Test Server Error.', {}, '') 348 logging.info('Encounter problem during starting a test server.') 349 350 def _KillTestServer(self): 351 """Stops the test server instance.""" 352 # There should only ever be one test server at a time. This may do the 353 # wrong thing if we try and start multiple test servers. 354 if not self.server.test_server_instance: 355 return 356 port = self.server.test_server_instance.host_port 357 logging.info('Handling request to kill a test server on port: %d.', port) 358 self.server.test_server_instance.Stop() 359 # Make sure the status of test server is correct before sending response. 360 if _CheckPortStatus(port, False): 361 self._SendResponse(200, 'OK', {}, 'killed') 362 logging.info('Test server on port %d is killed', port) 363 else: 364 self._SendResponse(500, 'Test Server Error.', {}, '') 365 logging.info('Encounter problem during killing a test server.') 366 self.server.test_server_instance = None 367 368 def do_POST(self): 369 parsed_path = urlparse.urlparse(self.path) 370 action = parsed_path.path 371 logging.info('Action for POST method is: %s.', action) 372 if action == '/start': 373 self._StartTestServer() 374 else: 375 self._SendResponse(400, 'Unknown request.', {}, '') 376 logging.info('Encounter unknown request: %s.', action) 377 378 def do_GET(self): 379 parsed_path = urlparse.urlparse(self.path) 380 action = parsed_path.path 381 params = urlparse.parse_qs(parsed_path.query, keep_blank_values=1) 382 logging.info('Action for GET method is: %s.', action) 383 for param in params: 384 logging.info('%s=%s', param, params[param][0]) 385 if action == '/kill': 386 self._KillTestServer() 387 elif action == '/ping': 388 # The ping handler is used to check whether the spawner server is ready 389 # to serve the requests. We don't need to test the status of the test 390 # server when handling ping request. 391 self._SendResponse(200, 'OK', {}, 'ready') 392 logging.info('Handled ping request and sent response.') 393 else: 394 self._SendResponse(400, 'Unknown request', {}, '') 395 logging.info('Encounter unknown request: %s.', action) 396 397 398class SpawningServer(object): 399 """The class used to start/stop a http server.""" 400 401 def __init__(self, test_server_spawner_port, adb, tool): 402 logging.info('Creating new spawner on port: %d.', test_server_spawner_port) 403 self.server = BaseHTTPServer.HTTPServer(('', test_server_spawner_port), 404 SpawningServerRequestHandler) 405 self.server.adb = adb 406 self.server.tool = tool 407 self.server.test_server_instance = None 408 self.server.build_type = constants.GetBuildType() 409 410 def _Listen(self): 411 logging.info('Starting test server spawner') 412 self.server.serve_forever() 413 414 def Start(self): 415 """Starts the test server spawner.""" 416 listener_thread = threading.Thread(target=self._Listen) 417 listener_thread.setDaemon(True) 418 listener_thread.start() 419 420 def Stop(self): 421 """Stops the test server spawner. 422 423 Also cleans the server state. 424 """ 425 self.CleanupState() 426 self.server.shutdown() 427 428 def CleanupState(self): 429 """Cleans up the spawning server state. 430 431 This should be called if the test server spawner is reused, 432 to avoid sharing the test server instance. 433 """ 434 if self.server.test_server_instance: 435 self.server.test_server_instance.Stop() 436 self.server.test_server_instance = None 437