1#!/usr/bin/env python2.7 2# Copyright 2015 gRPC authors. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Manage TCP ports for unit tests; started by run_tests.py""" 16 17import argparse 18from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler 19import hashlib 20import os 21import socket 22import sys 23import time 24import random 25from SocketServer import ThreadingMixIn 26import threading 27import platform 28 29# increment this number whenever making a change to ensure that 30# the changes are picked up by running CI servers 31# note that all changes must be backwards compatible 32_MY_VERSION = 20 33 34if len(sys.argv) == 2 and sys.argv[1] == 'dump_version': 35 print _MY_VERSION 36 sys.exit(0) 37 38argp = argparse.ArgumentParser(description='Server for httpcli_test') 39argp.add_argument('-p', '--port', default=12345, type=int) 40argp.add_argument('-l', '--logfile', default=None, type=str) 41args = argp.parse_args() 42 43if args.logfile is not None: 44 sys.stdin.close() 45 sys.stderr.close() 46 sys.stdout.close() 47 sys.stderr = open(args.logfile, 'w') 48 sys.stdout = sys.stderr 49 50print 'port server running on port %d' % args.port 51 52pool = [] 53in_use = {} 54mu = threading.Lock() 55 56# Cronet restricts the following ports to be used (see 57# https://cs.chromium.org/chromium/src/net/base/port_util.cc). When one of these 58# ports is used in a Cronet test, the test would fail (see issue #12149). These 59# ports must be excluded from pool. 60cronet_restricted_ports = [ 61 1, 7, 9, 11, 13, 15, 17, 19, 20, 21, 22, 23, 25, 37, 42, 43, 53, 77, 79, 87, 62 95, 101, 102, 103, 104, 109, 110, 111, 113, 115, 117, 119, 123, 135, 139, 63 143, 179, 389, 465, 512, 513, 514, 515, 526, 530, 531, 532, 540, 556, 563, 64 587, 601, 636, 993, 995, 2049, 3659, 4045, 6000, 6665, 6666, 6667, 6668, 65 6669, 6697 66] 67 68 69def can_connect(port): 70 # this test is only really useful on unices where SO_REUSE_PORT is available 71 # so on Windows, where this test is expensive, skip it 72 if platform.system() == 'Windows': return False 73 s = socket.socket() 74 try: 75 s.connect(('localhost', port)) 76 return True 77 except socket.error, e: 78 return False 79 finally: 80 s.close() 81 82 83def can_bind(port, proto): 84 s = socket.socket(proto, socket.SOCK_STREAM) 85 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 86 try: 87 s.bind(('localhost', port)) 88 return True 89 except socket.error, e: 90 return False 91 finally: 92 s.close() 93 94 95def refill_pool(max_timeout, req): 96 """Scan for ports not marked for being in use""" 97 chk = [ 98 port for port in list(range(1025, 32766)) 99 if port not in cronet_restricted_ports 100 ] 101 random.shuffle(chk) 102 for i in chk: 103 if len(pool) > 100: break 104 if i in in_use: 105 age = time.time() - in_use[i] 106 if age < max_timeout: 107 continue 108 req.log_message("kill old request %d" % i) 109 del in_use[i] 110 if can_bind(i, socket.AF_INET) and can_bind( 111 i, socket.AF_INET6) and not can_connect(i): 112 req.log_message("found available port %d" % i) 113 pool.append(i) 114 115 116def allocate_port(req): 117 global pool 118 global in_use 119 global mu 120 mu.acquire() 121 max_timeout = 600 122 while not pool: 123 refill_pool(max_timeout, req) 124 if not pool: 125 req.log_message("failed to find ports: retrying soon") 126 mu.release() 127 time.sleep(1) 128 mu.acquire() 129 max_timeout /= 2 130 port = pool[0] 131 pool = pool[1:] 132 in_use[port] = time.time() 133 mu.release() 134 return port 135 136 137keep_running = True 138 139 140class Handler(BaseHTTPRequestHandler): 141 142 def setup(self): 143 # If the client is unreachable for 5 seconds, close the connection 144 self.timeout = 5 145 BaseHTTPRequestHandler.setup(self) 146 147 def do_GET(self): 148 global keep_running 149 global mu 150 if self.path == '/get': 151 # allocate a new port, it will stay bound for ten minutes and until 152 # it's unused 153 self.send_response(200) 154 self.send_header('Content-Type', 'text/plain') 155 self.end_headers() 156 p = allocate_port(self) 157 self.log_message('allocated port %d' % p) 158 self.wfile.write('%d' % p) 159 elif self.path[0:6] == '/drop/': 160 self.send_response(200) 161 self.send_header('Content-Type', 'text/plain') 162 self.end_headers() 163 p = int(self.path[6:]) 164 mu.acquire() 165 if p in in_use: 166 del in_use[p] 167 pool.append(p) 168 k = 'known' 169 else: 170 k = 'unknown' 171 mu.release() 172 self.log_message('drop %s port %d' % (k, p)) 173 elif self.path == '/version_number': 174 # fetch a version string and the current process pid 175 self.send_response(200) 176 self.send_header('Content-Type', 'text/plain') 177 self.end_headers() 178 self.wfile.write(_MY_VERSION) 179 elif self.path == '/dump': 180 # yaml module is not installed on Macs and Windows machines by default 181 # so we import it lazily (/dump action is only used for debugging) 182 import yaml 183 self.send_response(200) 184 self.send_header('Content-Type', 'text/plain') 185 self.end_headers() 186 mu.acquire() 187 now = time.time() 188 out = yaml.dump({ 189 'pool': 190 pool, 191 'in_use': 192 dict((k, now - v) for k, v in in_use.items()) 193 }) 194 mu.release() 195 self.wfile.write(out) 196 elif self.path == '/quitquitquit': 197 self.send_response(200) 198 self.end_headers() 199 self.server.shutdown() 200 201 202class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): 203 """Handle requests in a separate thread""" 204 205 206ThreadedHTTPServer(('', args.port), Handler).serve_forever() 207