1#!/usr/bin/env python3 2# Copyright 2015 gRPC authors. 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15"""Manage TCP ports for unit tests; started by run_tests.py""" 16 17from __future__ import print_function 18 19import argparse 20import hashlib 21import os 22import platform 23import random 24import socket 25import sys 26import threading 27import time 28 29from six.moves.BaseHTTPServer import BaseHTTPRequestHandler 30from six.moves.BaseHTTPServer import HTTPServer 31from six.moves.socketserver import ThreadingMixIn 32 33# increment this number whenever making a change to ensure that 34# the changes are picked up by running CI servers 35# note that all changes must be backwards compatible 36_MY_VERSION = 21 37 38if len(sys.argv) == 2 and sys.argv[1] == "dump_version": 39 print(_MY_VERSION) 40 sys.exit(0) 41 42argp = argparse.ArgumentParser(description="Server for httpcli_test") 43argp.add_argument("-p", "--port", default=12345, type=int) 44argp.add_argument("-l", "--logfile", default=None, type=str) 45args = argp.parse_args() 46 47if args.logfile is not None: 48 sys.stdin.close() 49 sys.stderr.close() 50 sys.stdout.close() 51 sys.stderr = open(args.logfile, "w") 52 sys.stdout = sys.stderr 53 54print("port server running on port %d" % args.port) 55 56pool = [] 57in_use = {} 58mu = threading.Lock() 59 60# Cronet restricts the following ports to be used (see 61# https://cs.chromium.org/chromium/src/net/base/port_util.cc). When one of these 62# ports is used in a Cronet test, the test would fail (see issue #12149). These 63# ports must be excluded from pool. 64cronet_restricted_ports = [ 65 1, 66 7, 67 9, 68 11, 69 13, 70 15, 71 17, 72 19, 73 20, 74 21, 75 22, 76 23, 77 25, 78 37, 79 42, 80 43, 81 53, 82 77, 83 79, 84 87, 85 95, 86 101, 87 102, 88 103, 89 104, 90 109, 91 110, 92 111, 93 113, 94 115, 95 117, 96 119, 97 123, 98 135, 99 139, 100 143, 101 179, 102 389, 103 465, 104 512, 105 513, 106 514, 107 515, 108 526, 109 530, 110 531, 111 532, 112 540, 113 556, 114 563, 115 587, 116 601, 117 636, 118 993, 119 995, 120 2049, 121 3659, 122 4045, 123 6000, 124 6665, 125 6666, 126 6667, 127 6668, 128 6669, 129 6697, 130] 131 132 133def can_connect(port): 134 # this test is only really useful on unices where SO_REUSE_PORT is available 135 # so on Windows, where this test is expensive, skip it 136 if platform.system() == "Windows": 137 return False 138 s = socket.socket() 139 try: 140 s.connect(("localhost", port)) 141 return True 142 except socket.error as e: 143 return False 144 finally: 145 s.close() 146 147 148def can_bind(port, proto): 149 s = socket.socket(proto, socket.SOCK_STREAM) 150 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 151 try: 152 s.bind(("localhost", port)) 153 return True 154 except socket.error as e: 155 return False 156 finally: 157 s.close() 158 159 160def refill_pool(max_timeout, req): 161 """Scan for ports not marked for being in use""" 162 chk = [ 163 port 164 for port in range(1025, 32766) 165 if port not in cronet_restricted_ports 166 ] 167 random.shuffle(chk) 168 for i in chk: 169 if len(pool) > 100: 170 break 171 if i in in_use: 172 age = time.time() - in_use[i] 173 if age < max_timeout: 174 continue 175 req.log_message("kill old request %d" % i) 176 del in_use[i] 177 if ( 178 can_bind(i, socket.AF_INET) 179 and can_bind(i, socket.AF_INET6) 180 and not can_connect(i) 181 ): 182 req.log_message("found available port %d" % i) 183 pool.append(i) 184 185 186def allocate_port(req): 187 global pool 188 global in_use 189 global mu 190 mu.acquire() 191 max_timeout = 600 192 while not pool: 193 refill_pool(max_timeout, req) 194 if not pool: 195 req.log_message("failed to find ports: retrying soon") 196 mu.release() 197 time.sleep(1) 198 mu.acquire() 199 max_timeout /= 2 200 port = pool[0] 201 pool = pool[1:] 202 in_use[port] = time.time() 203 mu.release() 204 return port 205 206 207keep_running = True 208 209 210class Handler(BaseHTTPRequestHandler): 211 def setup(self): 212 # If the client is unreachable for 5 seconds, close the connection 213 self.timeout = 5 214 BaseHTTPRequestHandler.setup(self) 215 216 def do_GET(self): 217 global keep_running 218 global mu 219 if self.path == "/get": 220 # allocate a new port, it will stay bound for ten minutes and until 221 # it's unused 222 self.send_response(200) 223 self.send_header("Content-Type", "text/plain") 224 self.end_headers() 225 p = allocate_port(self) 226 self.log_message("allocated port %d" % p) 227 self.wfile.write(str(p).encode("ascii")) 228 elif self.path[0:6] == "/drop/": 229 self.send_response(200) 230 self.send_header("Content-Type", "text/plain") 231 self.end_headers() 232 p = int(self.path[6:]) 233 mu.acquire() 234 if p in in_use: 235 del in_use[p] 236 pool.append(p) 237 k = "known" 238 else: 239 k = "unknown" 240 mu.release() 241 self.log_message("drop %s port %d" % (k, p)) 242 elif self.path == "/version_number": 243 # fetch a version string and the current process pid 244 self.send_response(200) 245 self.send_header("Content-Type", "text/plain") 246 self.end_headers() 247 self.wfile.write(str(_MY_VERSION).encode("ascii")) 248 elif self.path == "/dump": 249 # yaml module is not installed on Macs and Windows machines by default 250 # so we import it lazily (/dump action is only used for debugging) 251 import yaml 252 253 self.send_response(200) 254 self.send_header("Content-Type", "text/plain") 255 self.end_headers() 256 mu.acquire() 257 now = time.time() 258 out = yaml.dump( 259 { 260 "pool": pool, 261 "in_use": dict( 262 (k, now - v) for k, v in list(in_use.items()) 263 ), 264 } 265 ) 266 mu.release() 267 self.wfile.write(out.encode("ascii")) 268 elif self.path == "/quitquitquit": 269 self.send_response(200) 270 self.end_headers() 271 self.server.shutdown() 272 273 274class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): 275 """Handle requests in a separate thread""" 276 277 278ThreadedHTTPServer(("", args.port), Handler).serve_forever() 279