1#!/usr/bin/env python2.7 2# Copyright 2015 gRPC authors. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Manage TCP ports for unit tests; started by run_tests.py""" 16 17from __future__ import print_function 18 19import argparse 20from six.moves.BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler 21from six.moves.socketserver import ThreadingMixIn 22import hashlib 23import os 24import socket 25import sys 26import time 27import random 28import threading 29import platform 30 31# increment this number whenever making a change to ensure that 32# the changes are picked up by running CI servers 33# note that all changes must be backwards compatible 34_MY_VERSION = 21 35 36if len(sys.argv) == 2 and sys.argv[1] == 'dump_version': 37 print(_MY_VERSION) 38 sys.exit(0) 39 40argp = argparse.ArgumentParser(description='Server for httpcli_test') 41argp.add_argument('-p', '--port', default=12345, type=int) 42argp.add_argument('-l', '--logfile', default=None, type=str) 43args = argp.parse_args() 44 45if args.logfile is not None: 46 sys.stdin.close() 47 sys.stderr.close() 48 sys.stdout.close() 49 sys.stderr = open(args.logfile, 'w') 50 sys.stdout = sys.stderr 51 52print('port server running on port %d' % args.port) 53 54pool = [] 55in_use = {} 56mu = threading.Lock() 57 58# Cronet restricts the following ports to be used (see 59# https://cs.chromium.org/chromium/src/net/base/port_util.cc). When one of these 60# ports is used in a Cronet test, the test would fail (see issue #12149). These 61# ports must be excluded from pool. 62cronet_restricted_ports = [ 63 1, 7, 9, 11, 13, 15, 17, 19, 20, 21, 22, 23, 25, 37, 42, 43, 53, 77, 79, 87, 64 95, 101, 102, 103, 104, 109, 110, 111, 113, 115, 117, 119, 123, 135, 139, 65 143, 179, 389, 465, 512, 513, 514, 515, 526, 530, 531, 532, 540, 556, 563, 66 587, 601, 636, 993, 995, 2049, 3659, 4045, 6000, 6665, 6666, 6667, 6668, 67 6669, 6697 68] 69 70 71def can_connect(port): 72 # this test is only really useful on unices where SO_REUSE_PORT is available 73 # so on Windows, where this test is expensive, skip it 74 if platform.system() == 'Windows': return False 75 s = socket.socket() 76 try: 77 s.connect(('localhost', port)) 78 return True 79 except socket.error as e: 80 return False 81 finally: 82 s.close() 83 84 85def can_bind(port, proto): 86 s = socket.socket(proto, socket.SOCK_STREAM) 87 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 88 try: 89 s.bind(('localhost', port)) 90 return True 91 except socket.error as e: 92 return False 93 finally: 94 s.close() 95 96 97def refill_pool(max_timeout, req): 98 """Scan for ports not marked for being in use""" 99 chk = [ 100 port for port in range(1025, 32766) 101 if port not in cronet_restricted_ports 102 ] 103 random.shuffle(chk) 104 for i in chk: 105 if len(pool) > 100: break 106 if i in in_use: 107 age = time.time() - in_use[i] 108 if age < max_timeout: 109 continue 110 req.log_message("kill old request %d" % i) 111 del in_use[i] 112 if can_bind(i, socket.AF_INET) and can_bind( 113 i, socket.AF_INET6) and not can_connect(i): 114 req.log_message("found available port %d" % i) 115 pool.append(i) 116 117 118def allocate_port(req): 119 global pool 120 global in_use 121 global mu 122 mu.acquire() 123 max_timeout = 600 124 while not pool: 125 refill_pool(max_timeout, req) 126 if not pool: 127 req.log_message("failed to find ports: retrying soon") 128 mu.release() 129 time.sleep(1) 130 mu.acquire() 131 max_timeout /= 2 132 port = pool[0] 133 pool = pool[1:] 134 in_use[port] = time.time() 135 mu.release() 136 return port 137 138 139keep_running = True 140 141 142class Handler(BaseHTTPRequestHandler): 143 144 def setup(self): 145 # If the client is unreachable for 5 seconds, close the connection 146 self.timeout = 5 147 BaseHTTPRequestHandler.setup(self) 148 149 def do_GET(self): 150 global keep_running 151 global mu 152 if self.path == '/get': 153 # allocate a new port, it will stay bound for ten minutes and until 154 # it's unused 155 self.send_response(200) 156 self.send_header('Content-Type', 'text/plain') 157 self.end_headers() 158 p = allocate_port(self) 159 self.log_message('allocated port %d' % p) 160 self.wfile.write(bytes('%d' % p)) 161 elif self.path[0:6] == '/drop/': 162 self.send_response(200) 163 self.send_header('Content-Type', 'text/plain') 164 self.end_headers() 165 p = int(self.path[6:]) 166 mu.acquire() 167 if p in in_use: 168 del in_use[p] 169 pool.append(p) 170 k = 'known' 171 else: 172 k = 'unknown' 173 mu.release() 174 self.log_message('drop %s port %d' % (k, p)) 175 elif self.path == '/version_number': 176 # fetch a version string and the current process pid 177 self.send_response(200) 178 self.send_header('Content-Type', 'text/plain') 179 self.end_headers() 180 self.wfile.write(bytes('%d' % _MY_VERSION)) 181 elif self.path == '/dump': 182 # yaml module is not installed on Macs and Windows machines by default 183 # so we import it lazily (/dump action is only used for debugging) 184 import yaml 185 self.send_response(200) 186 self.send_header('Content-Type', 'text/plain') 187 self.end_headers() 188 mu.acquire() 189 now = time.time() 190 out = yaml.dump({ 191 'pool': pool, 192 'in_use': dict((k, now - v) for k, v in in_use.items()) 193 }) 194 mu.release() 195 self.wfile.write(bytes(out)) 196 elif self.path == '/quitquitquit': 197 self.send_response(200) 198 self.end_headers() 199 self.server.shutdown() 200 201 202class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): 203 """Handle requests in a separate thread""" 204 205 206ThreadedHTTPServer(('', args.port), Handler).serve_forever() 207