• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2# Copyright 2015 gRPC authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15"""Manage TCP ports for unit tests; started by run_tests.py"""
16
17from __future__ import print_function
18
19import argparse
20import hashlib
21import os
22import platform
23import random
24import socket
25import sys
26import threading
27import time
28
29from six.moves.BaseHTTPServer import BaseHTTPRequestHandler
30from six.moves.BaseHTTPServer import HTTPServer
31from six.moves.socketserver import ThreadingMixIn
32
33# increment this number whenever making a change to ensure that
34# the changes are picked up by running CI servers
35# note that all changes must be backwards compatible
36_MY_VERSION = 21
37
38if len(sys.argv) == 2 and sys.argv[1] == "dump_version":
39    print(_MY_VERSION)
40    sys.exit(0)
41
42argp = argparse.ArgumentParser(description="Server for httpcli_test")
43argp.add_argument("-p", "--port", default=12345, type=int)
44argp.add_argument("-l", "--logfile", default=None, type=str)
45args = argp.parse_args()
46
47if args.logfile is not None:
48    sys.stdin.close()
49    sys.stderr.close()
50    sys.stdout.close()
51    sys.stderr = open(args.logfile, "w")
52    sys.stdout = sys.stderr
53
54print("port server running on port %d" % args.port)
55
56pool = []
57in_use = {}
58mu = threading.Lock()
59
60# Cronet restricts the following ports to be used (see
61# https://cs.chromium.org/chromium/src/net/base/port_util.cc). When one of these
62# ports is used in a Cronet test, the test would fail (see issue #12149). These
63# ports must be excluded from pool.
64cronet_restricted_ports = [
65    1,
66    7,
67    9,
68    11,
69    13,
70    15,
71    17,
72    19,
73    20,
74    21,
75    22,
76    23,
77    25,
78    37,
79    42,
80    43,
81    53,
82    77,
83    79,
84    87,
85    95,
86    101,
87    102,
88    103,
89    104,
90    109,
91    110,
92    111,
93    113,
94    115,
95    117,
96    119,
97    123,
98    135,
99    139,
100    143,
101    179,
102    389,
103    465,
104    512,
105    513,
106    514,
107    515,
108    526,
109    530,
110    531,
111    532,
112    540,
113    556,
114    563,
115    587,
116    601,
117    636,
118    993,
119    995,
120    2049,
121    3659,
122    4045,
123    6000,
124    6665,
125    6666,
126    6667,
127    6668,
128    6669,
129    6697,
130]
131
132
133def can_connect(port):
134    # this test is only really useful on unices where SO_REUSE_PORT is available
135    # so on Windows, where this test is expensive, skip it
136    if platform.system() == "Windows":
137        return False
138    s = socket.socket()
139    try:
140        s.connect(("localhost", port))
141        return True
142    except socket.error as e:
143        return False
144    finally:
145        s.close()
146
147
148def can_bind(port, proto):
149    s = socket.socket(proto, socket.SOCK_STREAM)
150    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
151    try:
152        s.bind(("localhost", port))
153        return True
154    except socket.error as e:
155        return False
156    finally:
157        s.close()
158
159
160def refill_pool(max_timeout, req):
161    """Scan for ports not marked for being in use"""
162    chk = [
163        port
164        for port in range(1025, 32766)
165        if port not in cronet_restricted_ports
166    ]
167    random.shuffle(chk)
168    for i in chk:
169        if len(pool) > 100:
170            break
171        if i in in_use:
172            age = time.time() - in_use[i]
173            if age < max_timeout:
174                continue
175            req.log_message("kill old request %d" % i)
176            del in_use[i]
177        if (
178            can_bind(i, socket.AF_INET)
179            and can_bind(i, socket.AF_INET6)
180            and not can_connect(i)
181        ):
182            req.log_message("found available port %d" % i)
183            pool.append(i)
184
185
186def allocate_port(req):
187    global pool
188    global in_use
189    global mu
190    mu.acquire()
191    max_timeout = 600
192    while not pool:
193        refill_pool(max_timeout, req)
194        if not pool:
195            req.log_message("failed to find ports: retrying soon")
196            mu.release()
197            time.sleep(1)
198            mu.acquire()
199            max_timeout /= 2
200    port = pool[0]
201    pool = pool[1:]
202    in_use[port] = time.time()
203    mu.release()
204    return port
205
206
207keep_running = True
208
209
210class Handler(BaseHTTPRequestHandler):
211    def setup(self):
212        # If the client is unreachable for 5 seconds, close the connection
213        self.timeout = 5
214        BaseHTTPRequestHandler.setup(self)
215
216    def do_GET(self):
217        global keep_running
218        global mu
219        if self.path == "/get":
220            # allocate a new port, it will stay bound for ten minutes and until
221            # it's unused
222            self.send_response(200)
223            self.send_header("Content-Type", "text/plain")
224            self.end_headers()
225            p = allocate_port(self)
226            self.log_message("allocated port %d" % p)
227            self.wfile.write(str(p).encode("ascii"))
228        elif self.path[0:6] == "/drop/":
229            self.send_response(200)
230            self.send_header("Content-Type", "text/plain")
231            self.end_headers()
232            p = int(self.path[6:])
233            mu.acquire()
234            if p in in_use:
235                del in_use[p]
236                pool.append(p)
237                k = "known"
238            else:
239                k = "unknown"
240            mu.release()
241            self.log_message("drop %s port %d" % (k, p))
242        elif self.path == "/version_number":
243            # fetch a version string and the current process pid
244            self.send_response(200)
245            self.send_header("Content-Type", "text/plain")
246            self.end_headers()
247            self.wfile.write(str(_MY_VERSION).encode("ascii"))
248        elif self.path == "/dump":
249            # yaml module is not installed on Macs and Windows machines by default
250            # so we import it lazily (/dump action is only used for debugging)
251            import yaml
252
253            self.send_response(200)
254            self.send_header("Content-Type", "text/plain")
255            self.end_headers()
256            mu.acquire()
257            now = time.time()
258            out = yaml.dump(
259                {
260                    "pool": pool,
261                    "in_use": dict(
262                        (k, now - v) for k, v in list(in_use.items())
263                    ),
264                }
265            )
266            mu.release()
267            self.wfile.write(out.encode("ascii"))
268        elif self.path == "/quitquitquit":
269            self.send_response(200)
270            self.end_headers()
271            self.server.shutdown()
272
273
274class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
275    """Handle requests in a separate thread"""
276
277
278ThreadedHTTPServer(("", args.port), Handler).serve_forever()
279