• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python2.7
2# Copyright 2015 gRPC authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Manage TCP ports for unit tests; started by run_tests.py"""
16
17from __future__ import print_function
18
19import argparse
20from six.moves.BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
21from six.moves.socketserver import ThreadingMixIn
22import hashlib
23import os
24import socket
25import sys
26import time
27import random
28import threading
29import platform
30
31# increment this number whenever making a change to ensure that
32# the changes are picked up by running CI servers
33# note that all changes must be backwards compatible
34_MY_VERSION = 21
35
36if len(sys.argv) == 2 and sys.argv[1] == 'dump_version':
37    print(_MY_VERSION)
38    sys.exit(0)
39
40argp = argparse.ArgumentParser(description='Server for httpcli_test')
41argp.add_argument('-p', '--port', default=12345, type=int)
42argp.add_argument('-l', '--logfile', default=None, type=str)
43args = argp.parse_args()
44
45if args.logfile is not None:
46    sys.stdin.close()
47    sys.stderr.close()
48    sys.stdout.close()
49    sys.stderr = open(args.logfile, 'w')
50    sys.stdout = sys.stderr
51
52print('port server running on port %d' % args.port)
53
54pool = []
55in_use = {}
56mu = threading.Lock()
57
58# Cronet restricts the following ports to be used (see
59# https://cs.chromium.org/chromium/src/net/base/port_util.cc). When one of these
60# ports is used in a Cronet test, the test would fail (see issue #12149). These
61# ports must be excluded from pool.
62cronet_restricted_ports = [
63    1, 7, 9, 11, 13, 15, 17, 19, 20, 21, 22, 23, 25, 37, 42, 43, 53, 77, 79, 87,
64    95, 101, 102, 103, 104, 109, 110, 111, 113, 115, 117, 119, 123, 135, 139,
65    143, 179, 389, 465, 512, 513, 514, 515, 526, 530, 531, 532, 540, 556, 563,
66    587, 601, 636, 993, 995, 2049, 3659, 4045, 6000, 6665, 6666, 6667, 6668,
67    6669, 6697
68]
69
70
71def can_connect(port):
72    # this test is only really useful on unices where SO_REUSE_PORT is available
73    # so on Windows, where this test is expensive, skip it
74    if platform.system() == 'Windows': return False
75    s = socket.socket()
76    try:
77        s.connect(('localhost', port))
78        return True
79    except socket.error as e:
80        return False
81    finally:
82        s.close()
83
84
85def can_bind(port, proto):
86    s = socket.socket(proto, socket.SOCK_STREAM)
87    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
88    try:
89        s.bind(('localhost', port))
90        return True
91    except socket.error as e:
92        return False
93    finally:
94        s.close()
95
96
97def refill_pool(max_timeout, req):
98    """Scan for ports not marked for being in use"""
99    chk = [
100        port for port in range(1025, 32766)
101        if port not in cronet_restricted_ports
102    ]
103    random.shuffle(chk)
104    for i in chk:
105        if len(pool) > 100: break
106        if i in in_use:
107            age = time.time() - in_use[i]
108            if age < max_timeout:
109                continue
110            req.log_message("kill old request %d" % i)
111            del in_use[i]
112        if can_bind(i, socket.AF_INET) and can_bind(
113                i, socket.AF_INET6) and not can_connect(i):
114            req.log_message("found available port %d" % i)
115            pool.append(i)
116
117
118def allocate_port(req):
119    global pool
120    global in_use
121    global mu
122    mu.acquire()
123    max_timeout = 600
124    while not pool:
125        refill_pool(max_timeout, req)
126        if not pool:
127            req.log_message("failed to find ports: retrying soon")
128            mu.release()
129            time.sleep(1)
130            mu.acquire()
131            max_timeout /= 2
132    port = pool[0]
133    pool = pool[1:]
134    in_use[port] = time.time()
135    mu.release()
136    return port
137
138
139keep_running = True
140
141
142class Handler(BaseHTTPRequestHandler):
143
144    def setup(self):
145        # If the client is unreachable for 5 seconds, close the connection
146        self.timeout = 5
147        BaseHTTPRequestHandler.setup(self)
148
149    def do_GET(self):
150        global keep_running
151        global mu
152        if self.path == '/get':
153            # allocate a new port, it will stay bound for ten minutes and until
154            # it's unused
155            self.send_response(200)
156            self.send_header('Content-Type', 'text/plain')
157            self.end_headers()
158            p = allocate_port(self)
159            self.log_message('allocated port %d' % p)
160            self.wfile.write(bytes('%d' % p))
161        elif self.path[0:6] == '/drop/':
162            self.send_response(200)
163            self.send_header('Content-Type', 'text/plain')
164            self.end_headers()
165            p = int(self.path[6:])
166            mu.acquire()
167            if p in in_use:
168                del in_use[p]
169                pool.append(p)
170                k = 'known'
171            else:
172                k = 'unknown'
173            mu.release()
174            self.log_message('drop %s port %d' % (k, p))
175        elif self.path == '/version_number':
176            # fetch a version string and the current process pid
177            self.send_response(200)
178            self.send_header('Content-Type', 'text/plain')
179            self.end_headers()
180            self.wfile.write(bytes('%d' % _MY_VERSION))
181        elif self.path == '/dump':
182            # yaml module is not installed on Macs and Windows machines by default
183            # so we import it lazily (/dump action is only used for debugging)
184            import yaml
185            self.send_response(200)
186            self.send_header('Content-Type', 'text/plain')
187            self.end_headers()
188            mu.acquire()
189            now = time.time()
190            out = yaml.dump({
191                'pool': pool,
192                'in_use': dict((k, now - v) for k, v in in_use.items())
193            })
194            mu.release()
195            self.wfile.write(bytes(out))
196        elif self.path == '/quitquitquit':
197            self.send_response(200)
198            self.end_headers()
199            self.server.shutdown()
200
201
202class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
203    """Handle requests in a separate thread"""
204
205
206ThreadedHTTPServer(('', args.port), Handler).serve_forever()
207