• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Unittests for the various HTTPServer modules.
2
3Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
5"""
6
7from http.server import BaseHTTPRequestHandler, HTTPServer, \
8     SimpleHTTPRequestHandler, CGIHTTPRequestHandler
9from http import server, HTTPStatus
10
11import os
12import socket
13import sys
14import re
15import base64
16import ntpath
17import shutil
18import email.message
19import email.utils
20import html
21import http.client
22import urllib.parse
23import tempfile
24import time
25import datetime
26import threading
27from unittest import mock
28from io import BytesIO
29
30import unittest
31from test import support
32
33
34class NoLogRequestHandler:
35    def log_message(self, *args):
36        # don't write log messages to stderr
37        pass
38
39    def read(self, n=None):
40        return ''
41
42
43class TestServerThread(threading.Thread):
44    def __init__(self, test_object, request_handler):
45        threading.Thread.__init__(self)
46        self.request_handler = request_handler
47        self.test_object = test_object
48
49    def run(self):
50        self.server = HTTPServer(('localhost', 0), self.request_handler)
51        self.test_object.HOST, self.test_object.PORT = self.server.socket.getsockname()
52        self.test_object.server_started.set()
53        self.test_object = None
54        try:
55            self.server.serve_forever(0.05)
56        finally:
57            self.server.server_close()
58
59    def stop(self):
60        self.server.shutdown()
61        self.join()
62
63
64class BaseTestCase(unittest.TestCase):
65    def setUp(self):
66        self._threads = support.threading_setup()
67        os.environ = support.EnvironmentVarGuard()
68        self.server_started = threading.Event()
69        self.thread = TestServerThread(self, self.request_handler)
70        self.thread.start()
71        self.server_started.wait()
72
73    def tearDown(self):
74        self.thread.stop()
75        self.thread = None
76        os.environ.__exit__()
77        support.threading_cleanup(*self._threads)
78
79    def request(self, uri, method='GET', body=None, headers={}):
80        self.connection = http.client.HTTPConnection(self.HOST, self.PORT)
81        self.connection.request(method, uri, body, headers)
82        return self.connection.getresponse()
83
84
85class BaseHTTPServerTestCase(BaseTestCase):
86    class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
87        protocol_version = 'HTTP/1.1'
88        default_request_version = 'HTTP/1.1'
89
90        def do_TEST(self):
91            self.send_response(HTTPStatus.NO_CONTENT)
92            self.send_header('Content-Type', 'text/html')
93            self.send_header('Connection', 'close')
94            self.end_headers()
95
96        def do_KEEP(self):
97            self.send_response(HTTPStatus.NO_CONTENT)
98            self.send_header('Content-Type', 'text/html')
99            self.send_header('Connection', 'keep-alive')
100            self.end_headers()
101
102        def do_KEYERROR(self):
103            self.send_error(999)
104
105        def do_NOTFOUND(self):
106            self.send_error(HTTPStatus.NOT_FOUND)
107
108        def do_EXPLAINERROR(self):
109            self.send_error(999, "Short Message",
110                            "This is a long \n explanation")
111
112        def do_CUSTOM(self):
113            self.send_response(999)
114            self.send_header('Content-Type', 'text/html')
115            self.send_header('Connection', 'close')
116            self.end_headers()
117
118        def do_LATINONEHEADER(self):
119            self.send_response(999)
120            self.send_header('X-Special', 'Dängerous Mind')
121            self.send_header('Connection', 'close')
122            self.end_headers()
123            body = self.headers['x-special-incoming'].encode('utf-8')
124            self.wfile.write(body)
125
126        def do_SEND_ERROR(self):
127            self.send_error(int(self.path[1:]))
128
129        def do_HEAD(self):
130            self.send_error(int(self.path[1:]))
131
132    def setUp(self):
133        BaseTestCase.setUp(self)
134        self.con = http.client.HTTPConnection(self.HOST, self.PORT)
135        self.con.connect()
136
137    def test_command(self):
138        self.con.request('GET', '/')
139        res = self.con.getresponse()
140        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
141
142    def test_request_line_trimming(self):
143        self.con._http_vsn_str = 'HTTP/1.1\n'
144        self.con.putrequest('XYZBOGUS', '/')
145        self.con.endheaders()
146        res = self.con.getresponse()
147        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
148
149    def test_version_bogus(self):
150        self.con._http_vsn_str = 'FUBAR'
151        self.con.putrequest('GET', '/')
152        self.con.endheaders()
153        res = self.con.getresponse()
154        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
155
156    def test_version_digits(self):
157        self.con._http_vsn_str = 'HTTP/9.9.9'
158        self.con.putrequest('GET', '/')
159        self.con.endheaders()
160        res = self.con.getresponse()
161        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
162
163    def test_version_none_get(self):
164        self.con._http_vsn_str = ''
165        self.con.putrequest('GET', '/')
166        self.con.endheaders()
167        res = self.con.getresponse()
168        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
169
170    def test_version_none(self):
171        # Test that a valid method is rejected when not HTTP/1.x
172        self.con._http_vsn_str = ''
173        self.con.putrequest('CUSTOM', '/')
174        self.con.endheaders()
175        res = self.con.getresponse()
176        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
177
178    def test_version_invalid(self):
179        self.con._http_vsn = 99
180        self.con._http_vsn_str = 'HTTP/9.9'
181        self.con.putrequest('GET', '/')
182        self.con.endheaders()
183        res = self.con.getresponse()
184        self.assertEqual(res.status, HTTPStatus.HTTP_VERSION_NOT_SUPPORTED)
185
186    def test_send_blank(self):
187        self.con._http_vsn_str = ''
188        self.con.putrequest('', '')
189        self.con.endheaders()
190        res = self.con.getresponse()
191        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
192
193    def test_header_close(self):
194        self.con.putrequest('GET', '/')
195        self.con.putheader('Connection', 'close')
196        self.con.endheaders()
197        res = self.con.getresponse()
198        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
199
200    def test_header_keep_alive(self):
201        self.con._http_vsn_str = 'HTTP/1.1'
202        self.con.putrequest('GET', '/')
203        self.con.putheader('Connection', 'keep-alive')
204        self.con.endheaders()
205        res = self.con.getresponse()
206        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
207
208    def test_handler(self):
209        self.con.request('TEST', '/')
210        res = self.con.getresponse()
211        self.assertEqual(res.status, HTTPStatus.NO_CONTENT)
212
213    def test_return_header_keep_alive(self):
214        self.con.request('KEEP', '/')
215        res = self.con.getresponse()
216        self.assertEqual(res.getheader('Connection'), 'keep-alive')
217        self.con.request('TEST', '/')
218        self.addCleanup(self.con.close)
219
220    def test_internal_key_error(self):
221        self.con.request('KEYERROR', '/')
222        res = self.con.getresponse()
223        self.assertEqual(res.status, 999)
224
225    def test_return_custom_status(self):
226        self.con.request('CUSTOM', '/')
227        res = self.con.getresponse()
228        self.assertEqual(res.status, 999)
229
230    def test_return_explain_error(self):
231        self.con.request('EXPLAINERROR', '/')
232        res = self.con.getresponse()
233        self.assertEqual(res.status, 999)
234        self.assertTrue(int(res.getheader('Content-Length')))
235
236    def test_latin1_header(self):
237        self.con.request('LATINONEHEADER', '/', headers={
238            'X-Special-Incoming':       'Ärger mit Unicode'
239        })
240        res = self.con.getresponse()
241        self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind')
242        self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8'))
243
244    def test_error_content_length(self):
245        # Issue #16088: standard error responses should have a content-length
246        self.con.request('NOTFOUND', '/')
247        res = self.con.getresponse()
248        self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
249
250        data = res.read()
251        self.assertEqual(int(res.getheader('Content-Length')), len(data))
252
253    def test_send_error(self):
254        allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED,
255                                         HTTPStatus.RESET_CONTENT)
256        for code in (HTTPStatus.NO_CONTENT, HTTPStatus.NOT_MODIFIED,
257                     HTTPStatus.PROCESSING, HTTPStatus.RESET_CONTENT,
258                     HTTPStatus.SWITCHING_PROTOCOLS):
259            self.con.request('SEND_ERROR', '/{}'.format(code))
260            res = self.con.getresponse()
261            self.assertEqual(code, res.status)
262            self.assertEqual(None, res.getheader('Content-Length'))
263            self.assertEqual(None, res.getheader('Content-Type'))
264            if code not in allow_transfer_encoding_codes:
265                self.assertEqual(None, res.getheader('Transfer-Encoding'))
266
267            data = res.read()
268            self.assertEqual(b'', data)
269
270    def test_head_via_send_error(self):
271        allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED,
272                                         HTTPStatus.RESET_CONTENT)
273        for code in (HTTPStatus.OK, HTTPStatus.NO_CONTENT,
274                     HTTPStatus.NOT_MODIFIED, HTTPStatus.RESET_CONTENT,
275                     HTTPStatus.SWITCHING_PROTOCOLS):
276            self.con.request('HEAD', '/{}'.format(code))
277            res = self.con.getresponse()
278            self.assertEqual(code, res.status)
279            if code == HTTPStatus.OK:
280                self.assertTrue(int(res.getheader('Content-Length')) > 0)
281                self.assertIn('text/html', res.getheader('Content-Type'))
282            else:
283                self.assertEqual(None, res.getheader('Content-Length'))
284                self.assertEqual(None, res.getheader('Content-Type'))
285            if code not in allow_transfer_encoding_codes:
286                self.assertEqual(None, res.getheader('Transfer-Encoding'))
287
288            data = res.read()
289            self.assertEqual(b'', data)
290
291
292class RequestHandlerLoggingTestCase(BaseTestCase):
293    class request_handler(BaseHTTPRequestHandler):
294        protocol_version = 'HTTP/1.1'
295        default_request_version = 'HTTP/1.1'
296
297        def do_GET(self):
298            self.send_response(HTTPStatus.OK)
299            self.end_headers()
300
301        def do_ERROR(self):
302            self.send_error(HTTPStatus.NOT_FOUND, 'File not found')
303
304    def test_get(self):
305        self.con = http.client.HTTPConnection(self.HOST, self.PORT)
306        self.con.connect()
307
308        with support.captured_stderr() as err:
309            self.con.request('GET', '/')
310            self.con.getresponse()
311
312        self.assertTrue(
313            err.getvalue().endswith('"GET / HTTP/1.1" 200 -\n'))
314
315    def test_err(self):
316        self.con = http.client.HTTPConnection(self.HOST, self.PORT)
317        self.con.connect()
318
319        with support.captured_stderr() as err:
320            self.con.request('ERROR', '/')
321            self.con.getresponse()
322
323        lines = err.getvalue().split('\n')
324        self.assertTrue(lines[0].endswith('code 404, message File not found'))
325        self.assertTrue(lines[1].endswith('"ERROR / HTTP/1.1" 404 -'))
326
327
328class SimpleHTTPServerTestCase(BaseTestCase):
329    class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
330        pass
331
332    def setUp(self):
333        super().setUp()
334        self.cwd = os.getcwd()
335        basetempdir = tempfile.gettempdir()
336        os.chdir(basetempdir)
337        self.data = b'We are the knights who say Ni!'
338        self.tempdir = tempfile.mkdtemp(dir=basetempdir)
339        self.tempdir_name = os.path.basename(self.tempdir)
340        self.base_url = '/' + self.tempdir_name
341        tempname = os.path.join(self.tempdir, 'test')
342        with open(tempname, 'wb') as temp:
343            temp.write(self.data)
344            temp.flush()
345        mtime = os.stat(tempname).st_mtime
346        # compute last modification datetime for browser cache tests
347        last_modif = datetime.datetime.fromtimestamp(mtime,
348            datetime.timezone.utc)
349        self.last_modif_datetime = last_modif.replace(microsecond=0)
350        self.last_modif_header = email.utils.formatdate(
351            last_modif.timestamp(), usegmt=True)
352
353    def tearDown(self):
354        try:
355            os.chdir(self.cwd)
356            try:
357                shutil.rmtree(self.tempdir)
358            except:
359                pass
360        finally:
361            super().tearDown()
362
363    def check_status_and_reason(self, response, status, data=None):
364        def close_conn():
365            """Don't close reader yet so we can check if there was leftover
366            buffered input"""
367            nonlocal reader
368            reader = response.fp
369            response.fp = None
370        reader = None
371        response._close_conn = close_conn
372
373        body = response.read()
374        self.assertTrue(response)
375        self.assertEqual(response.status, status)
376        self.assertIsNotNone(response.reason)
377        if data:
378            self.assertEqual(data, body)
379        # Ensure the server has not set up a persistent connection, and has
380        # not sent any extra data
381        self.assertEqual(response.version, 10)
382        self.assertEqual(response.msg.get("Connection", "close"), "close")
383        self.assertEqual(reader.read(30), b'', 'Connection should be closed')
384
385        reader.close()
386        return body
387
388    @unittest.skipIf(sys.platform == 'darwin',
389                     'undecodable name cannot always be decoded on macOS')
390    @unittest.skipIf(sys.platform == 'win32',
391                     'undecodable name cannot be decoded on win32')
392    @unittest.skipUnless(support.TESTFN_UNDECODABLE,
393                         'need support.TESTFN_UNDECODABLE')
394    def test_undecodable_filename(self):
395        enc = sys.getfilesystemencoding()
396        filename = os.fsdecode(support.TESTFN_UNDECODABLE) + '.txt'
397        with open(os.path.join(self.tempdir, filename), 'wb') as f:
398            f.write(support.TESTFN_UNDECODABLE)
399        response = self.request(self.base_url + '/')
400        if sys.platform == 'darwin':
401            # On Mac OS the HFS+ filesystem replaces bytes that aren't valid
402            # UTF-8 into a percent-encoded value.
403            for name in os.listdir(self.tempdir):
404                if name != 'test': # Ignore a filename created in setUp().
405                    filename = name
406                    break
407        body = self.check_status_and_reason(response, HTTPStatus.OK)
408        quotedname = urllib.parse.quote(filename, errors='surrogatepass')
409        self.assertIn(('href="%s"' % quotedname)
410                      .encode(enc, 'surrogateescape'), body)
411        self.assertIn(('>%s<' % html.escape(filename, quote=False))
412                      .encode(enc, 'surrogateescape'), body)
413        response = self.request(self.base_url + '/' + quotedname)
414        self.check_status_and_reason(response, HTTPStatus.OK,
415                                     data=support.TESTFN_UNDECODABLE)
416
417    def test_get_dir_redirect_location_domain_injection_bug(self):
418        """Ensure //evil.co/..%2f../../X does not put //evil.co/ in Location.
419
420        //netloc/ in a Location header is a redirect to a new host.
421        https://github.com/python/cpython/issues/87389
422
423        This checks that a path resolving to a directory on our server cannot
424        resolve into a redirect to another server.
425        """
426        os.mkdir(os.path.join(self.tempdir, 'existing_directory'))
427        url = f'/python.org/..%2f..%2f..%2f..%2f..%2f../%0a%0d/../{self.tempdir_name}/existing_directory'
428        expected_location = f'{url}/'  # /python.org.../ single slash single prefix, trailing slash
429        # Canonicalizes to /tmp/tempdir_name/existing_directory which does
430        # exist and is a dir, triggering the 301 redirect logic.
431        response = self.request(url)
432        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
433        location = response.getheader('Location')
434        self.assertEqual(location, expected_location, msg='non-attack failed!')
435
436        # //python.org... multi-slash prefix, no trailing slash
437        attack_url = f'/{url}'
438        response = self.request(attack_url)
439        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
440        location = response.getheader('Location')
441        self.assertFalse(location.startswith('//'), msg=location)
442        self.assertEqual(location, expected_location,
443                msg='Expected Location header to start with a single / and '
444                'end with a / as this is a directory redirect.')
445
446        # ///python.org... triple-slash prefix, no trailing slash
447        attack3_url = f'//{url}'
448        response = self.request(attack3_url)
449        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
450        self.assertEqual(response.getheader('Location'), expected_location)
451
452        # If the second word in the http request (Request-URI for the http
453        # method) is a full URI, we don't worry about it, as that'll be parsed
454        # and reassembled as a full URI within BaseHTTPRequestHandler.send_head
455        # so no errant scheme-less //netloc//evil.co/ domain mixup can happen.
456        attack_scheme_netloc_2slash_url = f'https://pypi.org/{url}'
457        expected_scheme_netloc_location = f'{attack_scheme_netloc_2slash_url}/'
458        response = self.request(attack_scheme_netloc_2slash_url)
459        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
460        location = response.getheader('Location')
461        # We're just ensuring that the scheme and domain make it through, if
462        # there are or aren't multiple slashes at the start of the path that
463        # follows that isn't important in this Location: header.
464        self.assertTrue(location.startswith('https://pypi.org/'), msg=location)
465
466    def test_get(self):
467        #constructs the path relative to the root directory of the HTTPServer
468        response = self.request(self.base_url + '/test')
469        self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
470        # check for trailing "/" which should return 404. See Issue17324
471        response = self.request(self.base_url + '/test/')
472        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
473        response = self.request(self.base_url + '/')
474        self.check_status_and_reason(response, HTTPStatus.OK)
475        response = self.request(self.base_url)
476        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
477        response = self.request(self.base_url + '/?hi=2')
478        self.check_status_and_reason(response, HTTPStatus.OK)
479        response = self.request(self.base_url + '?hi=1')
480        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
481        self.assertEqual(response.getheader("Location"),
482                         self.base_url + "/?hi=1")
483        response = self.request('/ThisDoesNotExist')
484        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
485        response = self.request('/' + 'ThisDoesNotExist' + '/')
486        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
487
488        data = b"Dummy index file\r\n"
489        with open(os.path.join(self.tempdir_name, 'index.html'), 'wb') as f:
490            f.write(data)
491        response = self.request(self.base_url + '/')
492        self.check_status_and_reason(response, HTTPStatus.OK, data)
493
494        # chmod() doesn't work as expected on Windows, and filesystem
495        # permissions are ignored by root on Unix.
496        if os.name == 'posix' and os.geteuid() != 0:
497            os.chmod(self.tempdir, 0)
498            try:
499                response = self.request(self.base_url + '/')
500                self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
501            finally:
502                os.chmod(self.tempdir, 0o755)
503
504    def test_head(self):
505        response = self.request(
506            self.base_url + '/test', method='HEAD')
507        self.check_status_and_reason(response, HTTPStatus.OK)
508        self.assertEqual(response.getheader('content-length'),
509                         str(len(self.data)))
510        self.assertEqual(response.getheader('content-type'),
511                         'application/octet-stream')
512
513    def test_browser_cache(self):
514        """Check that when a request to /test is sent with the request header
515        If-Modified-Since set to date of last modification, the server returns
516        status code 304, not 200
517        """
518        headers = email.message.Message()
519        headers['If-Modified-Since'] = self.last_modif_header
520        response = self.request(self.base_url + '/test', headers=headers)
521        self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED)
522
523        # one hour after last modification : must return 304
524        new_dt = self.last_modif_datetime + datetime.timedelta(hours=1)
525        headers = email.message.Message()
526        headers['If-Modified-Since'] = email.utils.format_datetime(new_dt,
527            usegmt=True)
528        response = self.request(self.base_url + '/test', headers=headers)
529        self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED)
530
531    def test_browser_cache_file_changed(self):
532        # with If-Modified-Since earlier than Last-Modified, must return 200
533        dt = self.last_modif_datetime
534        # build datetime object : 365 days before last modification
535        old_dt = dt - datetime.timedelta(days=365)
536        headers = email.message.Message()
537        headers['If-Modified-Since'] = email.utils.format_datetime(old_dt,
538            usegmt=True)
539        response = self.request(self.base_url + '/test', headers=headers)
540        self.check_status_and_reason(response, HTTPStatus.OK)
541
542    def test_browser_cache_with_If_None_Match_header(self):
543        # if If-None-Match header is present, ignore If-Modified-Since
544
545        headers = email.message.Message()
546        headers['If-Modified-Since'] = self.last_modif_header
547        headers['If-None-Match'] = "*"
548        response = self.request(self.base_url + '/test', headers=headers)
549        self.check_status_and_reason(response, HTTPStatus.OK)
550
551    def test_invalid_requests(self):
552        response = self.request('/', method='FOO')
553        self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
554        # requests must be case sensitive,so this should fail too
555        response = self.request('/', method='custom')
556        self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
557        response = self.request('/', method='GETs')
558        self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
559
560    def test_last_modified(self):
561        """Checks that the datetime returned in Last-Modified response header
562        is the actual datetime of last modification, rounded to the second
563        """
564        response = self.request(self.base_url + '/test')
565        self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
566        last_modif_header = response.headers['Last-modified']
567        self.assertEqual(last_modif_header, self.last_modif_header)
568
569    def test_path_without_leading_slash(self):
570        response = self.request(self.tempdir_name + '/test')
571        self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
572        response = self.request(self.tempdir_name + '/test/')
573        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
574        response = self.request(self.tempdir_name + '/')
575        self.check_status_and_reason(response, HTTPStatus.OK)
576        response = self.request(self.tempdir_name)
577        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
578        response = self.request(self.tempdir_name + '/?hi=2')
579        self.check_status_and_reason(response, HTTPStatus.OK)
580        response = self.request(self.tempdir_name + '?hi=1')
581        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
582        self.assertEqual(response.getheader("Location"),
583                         self.tempdir_name + "/?hi=1")
584
585    def test_html_escape_filename(self):
586        filename = '<test&>.txt'
587        fullpath = os.path.join(self.tempdir, filename)
588
589        try:
590            open(fullpath, 'w').close()
591        except OSError:
592            raise unittest.SkipTest('Can not create file %s on current file '
593                                    'system' % filename)
594
595        try:
596            response = self.request(self.base_url + '/')
597            body = self.check_status_and_reason(response, HTTPStatus.OK)
598            enc = response.headers.get_content_charset()
599        finally:
600            os.unlink(fullpath)  # avoid affecting test_undecodable_filename
601
602        self.assertIsNotNone(enc)
603        html_text = '>%s<' % html.escape(filename, quote=False)
604        self.assertIn(html_text.encode(enc), body)
605
606
607cgi_file1 = """\
608#!%s
609
610print("Content-type: text/html")
611print()
612print("Hello World")
613"""
614
615cgi_file2 = """\
616#!%s
617import cgi
618
619print("Content-type: text/html")
620print()
621
622form = cgi.FieldStorage()
623print("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),
624                          form.getfirst("bacon")))
625"""
626
627cgi_file4 = """\
628#!%s
629import os
630
631print("Content-type: text/html")
632print()
633
634print(os.environ["%s"])
635"""
636
637
638@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
639        "This test can't be run reliably as root (issue #13308).")
640class CGIHTTPServerTestCase(BaseTestCase):
641    class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
642        pass
643
644    linesep = os.linesep.encode('ascii')
645
646    def setUp(self):
647        BaseTestCase.setUp(self)
648        self.cwd = os.getcwd()
649        self.parent_dir = tempfile.mkdtemp()
650        self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
651        self.cgi_child_dir = os.path.join(self.cgi_dir, 'child-dir')
652        os.mkdir(self.cgi_dir)
653        os.mkdir(self.cgi_child_dir)
654        self.nocgi_path = None
655        self.file1_path = None
656        self.file2_path = None
657        self.file3_path = None
658        self.file4_path = None
659
660        # The shebang line should be pure ASCII: use symlink if possible.
661        # See issue #7668.
662        self._pythonexe_symlink = None
663        if support.can_symlink():
664            self.pythonexe = os.path.join(self.parent_dir, 'python')
665            self._pythonexe_symlink = support.PythonSymlink(self.pythonexe).__enter__()
666        else:
667            self.pythonexe = sys.executable
668
669        try:
670            # The python executable path is written as the first line of the
671            # CGI Python script. The encoding cookie cannot be used, and so the
672            # path should be encodable to the default script encoding (utf-8)
673            self.pythonexe.encode('utf-8')
674        except UnicodeEncodeError:
675            self.tearDown()
676            self.skipTest("Python executable path is not encodable to utf-8")
677
678        self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py')
679        with open(self.nocgi_path, 'w') as fp:
680            fp.write(cgi_file1 % self.pythonexe)
681        os.chmod(self.nocgi_path, 0o777)
682
683        self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
684        with open(self.file1_path, 'w', encoding='utf-8') as file1:
685            file1.write(cgi_file1 % self.pythonexe)
686        os.chmod(self.file1_path, 0o777)
687
688        self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
689        with open(self.file2_path, 'w', encoding='utf-8') as file2:
690            file2.write(cgi_file2 % self.pythonexe)
691        os.chmod(self.file2_path, 0o777)
692
693        self.file3_path = os.path.join(self.cgi_child_dir, 'file3.py')
694        with open(self.file3_path, 'w', encoding='utf-8') as file3:
695            file3.write(cgi_file1 % self.pythonexe)
696        os.chmod(self.file3_path, 0o777)
697
698        self.file4_path = os.path.join(self.cgi_dir, 'file4.py')
699        with open(self.file4_path, 'w', encoding='utf-8') as file4:
700            file4.write(cgi_file4 % (self.pythonexe, 'QUERY_STRING'))
701        os.chmod(self.file4_path, 0o777)
702
703        os.chdir(self.parent_dir)
704
705    def tearDown(self):
706        try:
707            os.chdir(self.cwd)
708            if self._pythonexe_symlink:
709                self._pythonexe_symlink.__exit__(None, None, None)
710            if self.nocgi_path:
711                os.remove(self.nocgi_path)
712            if self.file1_path:
713                os.remove(self.file1_path)
714            if self.file2_path:
715                os.remove(self.file2_path)
716            if self.file3_path:
717                os.remove(self.file3_path)
718            if self.file4_path:
719                os.remove(self.file4_path)
720            os.rmdir(self.cgi_child_dir)
721            os.rmdir(self.cgi_dir)
722            os.rmdir(self.parent_dir)
723        finally:
724            BaseTestCase.tearDown(self)
725
726    def test_url_collapse_path(self):
727        # verify tail is the last portion and head is the rest on proper urls
728        test_vectors = {
729            '': '//',
730            '..': IndexError,
731            '/.//..': IndexError,
732            '/': '//',
733            '//': '//',
734            '/\\': '//\\',
735            '/.//': '//',
736            'cgi-bin/file1.py': '/cgi-bin/file1.py',
737            '/cgi-bin/file1.py': '/cgi-bin/file1.py',
738            'a': '//a',
739            '/a': '//a',
740            '//a': '//a',
741            './a': '//a',
742            './C:/': '/C:/',
743            '/a/b': '/a/b',
744            '/a/b/': '/a/b/',
745            '/a/b/.': '/a/b/',
746            '/a/b/c/..': '/a/b/',
747            '/a/b/c/../d': '/a/b/d',
748            '/a/b/c/../d/e/../f': '/a/b/d/f',
749            '/a/b/c/../d/e/../../f': '/a/b/f',
750            '/a/b/c/../d/e/.././././..//f': '/a/b/f',
751            '../a/b/c/../d/e/.././././..//f': IndexError,
752            '/a/b/c/../d/e/../../../f': '/a/f',
753            '/a/b/c/../d/e/../../../../f': '//f',
754            '/a/b/c/../d/e/../../../../../f': IndexError,
755            '/a/b/c/../d/e/../../../../f/..': '//',
756            '/a/b/c/../d/e/../../../../f/../.': '//',
757        }
758        for path, expected in test_vectors.items():
759            if isinstance(expected, type) and issubclass(expected, Exception):
760                self.assertRaises(expected,
761                                  server._url_collapse_path, path)
762            else:
763                actual = server._url_collapse_path(path)
764                self.assertEqual(expected, actual,
765                                 msg='path = %r\nGot:    %r\nWanted: %r' %
766                                 (path, actual, expected))
767
768    def test_headers_and_content(self):
769        res = self.request('/cgi-bin/file1.py')
770        self.assertEqual(
771            (res.read(), res.getheader('Content-type'), res.status),
772            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK))
773
774    def test_issue19435(self):
775        res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh')
776        self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
777
778    def test_post(self):
779        params = urllib.parse.urlencode(
780            {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
781        headers = {'Content-type' : 'application/x-www-form-urlencoded'}
782        res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
783
784        self.assertEqual(res.read(), b'1, python, 123456' + self.linesep)
785
786    def test_invaliduri(self):
787        res = self.request('/cgi-bin/invalid')
788        res.read()
789        self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
790
791    def test_authorization(self):
792        headers = {b'Authorization' : b'Basic ' +
793                   base64.b64encode(b'username:pass')}
794        res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
795        self.assertEqual(
796            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
797            (res.read(), res.getheader('Content-type'), res.status))
798
799    def test_no_leading_slash(self):
800        # http://bugs.python.org/issue2254
801        res = self.request('cgi-bin/file1.py')
802        self.assertEqual(
803            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
804            (res.read(), res.getheader('Content-type'), res.status))
805
806    def test_os_environ_is_not_altered(self):
807        signature = "Test CGI Server"
808        os.environ['SERVER_SOFTWARE'] = signature
809        res = self.request('/cgi-bin/file1.py')
810        self.assertEqual(
811            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
812            (res.read(), res.getheader('Content-type'), res.status))
813        self.assertEqual(os.environ['SERVER_SOFTWARE'], signature)
814
815    def test_urlquote_decoding_in_cgi_check(self):
816        res = self.request('/cgi-bin%2ffile1.py')
817        self.assertEqual(
818            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
819            (res.read(), res.getheader('Content-type'), res.status))
820
821    def test_nested_cgi_path_issue21323(self):
822        res = self.request('/cgi-bin/child-dir/file3.py')
823        self.assertEqual(
824            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
825            (res.read(), res.getheader('Content-type'), res.status))
826
827    def test_query_with_multiple_question_mark(self):
828        res = self.request('/cgi-bin/file4.py?a=b?c=d')
829        self.assertEqual(
830            (b'a=b?c=d' + self.linesep, 'text/html', HTTPStatus.OK),
831            (res.read(), res.getheader('Content-type'), res.status))
832
833    def test_query_with_continuous_slashes(self):
834        res = self.request('/cgi-bin/file4.py?k=aa%2F%2Fbb&//q//p//=//a//b//')
835        self.assertEqual(
836            (b'k=aa%2F%2Fbb&//q//p//=//a//b//' + self.linesep,
837             'text/html', HTTPStatus.OK),
838            (res.read(), res.getheader('Content-type'), res.status))
839
840
841class SocketlessRequestHandler(SimpleHTTPRequestHandler):
842    def __init__(self, *args, **kwargs):
843        request = mock.Mock()
844        request.makefile.return_value = BytesIO()
845        super().__init__(request, None, None)
846
847        self.get_called = False
848        self.protocol_version = "HTTP/1.1"
849
850    def do_GET(self):
851        self.get_called = True
852        self.send_response(HTTPStatus.OK)
853        self.send_header('Content-Type', 'text/html')
854        self.end_headers()
855        self.wfile.write(b'<html><body>Data</body></html>\r\n')
856
857    def log_message(self, format, *args):
858        pass
859
860class RejectingSocketlessRequestHandler(SocketlessRequestHandler):
861    def handle_expect_100(self):
862        self.send_error(HTTPStatus.EXPECTATION_FAILED)
863        return False
864
865
866class AuditableBytesIO:
867
868    def __init__(self):
869        self.datas = []
870
871    def write(self, data):
872        self.datas.append(data)
873
874    def getData(self):
875        return b''.join(self.datas)
876
877    @property
878    def numWrites(self):
879        return len(self.datas)
880
881
882class BaseHTTPRequestHandlerTestCase(unittest.TestCase):
883    """Test the functionality of the BaseHTTPServer.
884
885       Test the support for the Expect 100-continue header.
886       """
887
888    HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK')
889
890    def setUp (self):
891        self.handler = SocketlessRequestHandler()
892
893    def send_typical_request(self, message):
894        input = BytesIO(message)
895        output = BytesIO()
896        self.handler.rfile = input
897        self.handler.wfile = output
898        self.handler.handle_one_request()
899        output.seek(0)
900        return output.readlines()
901
902    def verify_get_called(self):
903        self.assertTrue(self.handler.get_called)
904
905    def verify_expected_headers(self, headers):
906        for fieldName in b'Server: ', b'Date: ', b'Content-Type: ':
907            self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1)
908
909    def verify_http_server_response(self, response):
910        match = self.HTTPResponseMatch.search(response)
911        self.assertIsNotNone(match)
912
913    def test_http_1_1(self):
914        result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n')
915        self.verify_http_server_response(result[0])
916        self.verify_expected_headers(result[1:-1])
917        self.verify_get_called()
918        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
919        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
920        self.assertEqual(self.handler.command, 'GET')
921        self.assertEqual(self.handler.path, '/')
922        self.assertEqual(self.handler.request_version, 'HTTP/1.1')
923        self.assertSequenceEqual(self.handler.headers.items(), ())
924
925    def test_http_1_0(self):
926        result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n')
927        self.verify_http_server_response(result[0])
928        self.verify_expected_headers(result[1:-1])
929        self.verify_get_called()
930        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
931        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0')
932        self.assertEqual(self.handler.command, 'GET')
933        self.assertEqual(self.handler.path, '/')
934        self.assertEqual(self.handler.request_version, 'HTTP/1.0')
935        self.assertSequenceEqual(self.handler.headers.items(), ())
936
937    def test_http_0_9(self):
938        result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n')
939        self.assertEqual(len(result), 1)
940        self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n')
941        self.verify_get_called()
942
943    def test_extra_space(self):
944        result = self.send_typical_request(
945            b'GET /spaced out HTTP/1.1\r\n'
946            b'Host: dummy\r\n'
947            b'\r\n'
948        )
949        self.assertTrue(result[0].startswith(b'HTTP/1.1 400 '))
950        self.verify_expected_headers(result[1:result.index(b'\r\n')])
951        self.assertFalse(self.handler.get_called)
952
953    def test_with_continue_1_0(self):
954        result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n')
955        self.verify_http_server_response(result[0])
956        self.verify_expected_headers(result[1:-1])
957        self.verify_get_called()
958        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
959        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0')
960        self.assertEqual(self.handler.command, 'GET')
961        self.assertEqual(self.handler.path, '/')
962        self.assertEqual(self.handler.request_version, 'HTTP/1.0')
963        headers = (("Expect", "100-continue"),)
964        self.assertSequenceEqual(self.handler.headers.items(), headers)
965
966    def test_with_continue_1_1(self):
967        result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
968        self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n')
969        self.assertEqual(result[1], b'\r\n')
970        self.assertEqual(result[2], b'HTTP/1.1 200 OK\r\n')
971        self.verify_expected_headers(result[2:-1])
972        self.verify_get_called()
973        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
974        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
975        self.assertEqual(self.handler.command, 'GET')
976        self.assertEqual(self.handler.path, '/')
977        self.assertEqual(self.handler.request_version, 'HTTP/1.1')
978        headers = (("Expect", "100-continue"),)
979        self.assertSequenceEqual(self.handler.headers.items(), headers)
980
981    def test_header_buffering_of_send_error(self):
982
983        input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
984        output = AuditableBytesIO()
985        handler = SocketlessRequestHandler()
986        handler.rfile = input
987        handler.wfile = output
988        handler.request_version = 'HTTP/1.1'
989        handler.requestline = ''
990        handler.command = None
991
992        handler.send_error(418)
993        self.assertEqual(output.numWrites, 2)
994
995    def test_header_buffering_of_send_response_only(self):
996
997        input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
998        output = AuditableBytesIO()
999        handler = SocketlessRequestHandler()
1000        handler.rfile = input
1001        handler.wfile = output
1002        handler.request_version = 'HTTP/1.1'
1003
1004        handler.send_response_only(418)
1005        self.assertEqual(output.numWrites, 0)
1006        handler.end_headers()
1007        self.assertEqual(output.numWrites, 1)
1008
1009    def test_header_buffering_of_send_header(self):
1010
1011        input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
1012        output = AuditableBytesIO()
1013        handler = SocketlessRequestHandler()
1014        handler.rfile = input
1015        handler.wfile = output
1016        handler.request_version = 'HTTP/1.1'
1017
1018        handler.send_header('Foo', 'foo')
1019        handler.send_header('bar', 'bar')
1020        self.assertEqual(output.numWrites, 0)
1021        handler.end_headers()
1022        self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n')
1023        self.assertEqual(output.numWrites, 1)
1024
1025    def test_header_unbuffered_when_continue(self):
1026
1027        def _readAndReseek(f):
1028            pos = f.tell()
1029            f.seek(0)
1030            data = f.read()
1031            f.seek(pos)
1032            return data
1033
1034        input = BytesIO(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
1035        output = BytesIO()
1036        self.handler.rfile = input
1037        self.handler.wfile = output
1038        self.handler.request_version = 'HTTP/1.1'
1039
1040        self.handler.handle_one_request()
1041        self.assertNotEqual(_readAndReseek(output), b'')
1042        result = _readAndReseek(output).split(b'\r\n')
1043        self.assertEqual(result[0], b'HTTP/1.1 100 Continue')
1044        self.assertEqual(result[1], b'')
1045        self.assertEqual(result[2], b'HTTP/1.1 200 OK')
1046
1047    def test_with_continue_rejected(self):
1048        usual_handler = self.handler        # Save to avoid breaking any subsequent tests.
1049        self.handler = RejectingSocketlessRequestHandler()
1050        result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
1051        self.assertEqual(result[0], b'HTTP/1.1 417 Expectation Failed\r\n')
1052        self.verify_expected_headers(result[1:-1])
1053        # The expect handler should short circuit the usual get method by
1054        # returning false here, so get_called should be false
1055        self.assertFalse(self.handler.get_called)
1056        self.assertEqual(sum(r == b'Connection: close\r\n' for r in result[1:-1]), 1)
1057        self.handler = usual_handler        # Restore to avoid breaking any subsequent tests.
1058
1059    def test_request_length(self):
1060        # Issue #10714: huge request lines are discarded, to avoid Denial
1061        # of Service attacks.
1062        result = self.send_typical_request(b'GET ' + b'x' * 65537)
1063        self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n')
1064        self.assertFalse(self.handler.get_called)
1065        self.assertIsInstance(self.handler.requestline, str)
1066
1067    def test_header_length(self):
1068        # Issue #6791: same for headers
1069        result = self.send_typical_request(
1070            b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n')
1071        self.assertEqual(result[0], b'HTTP/1.1 431 Line too long\r\n')
1072        self.assertFalse(self.handler.get_called)
1073        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
1074
1075    def test_too_many_headers(self):
1076        result = self.send_typical_request(
1077            b'GET / HTTP/1.1\r\n' + b'X-Foo: bar\r\n' * 101 + b'\r\n')
1078        self.assertEqual(result[0], b'HTTP/1.1 431 Too many headers\r\n')
1079        self.assertFalse(self.handler.get_called)
1080        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
1081
1082    def test_html_escape_on_error(self):
1083        result = self.send_typical_request(
1084            b'<script>alert("hello")</script> / HTTP/1.1')
1085        result = b''.join(result)
1086        text = '<script>alert("hello")</script>'
1087        self.assertIn(html.escape(text, quote=False).encode('ascii'), result)
1088
1089    def test_close_connection(self):
1090        # handle_one_request() should be repeatedly called until
1091        # it sets close_connection
1092        def handle_one_request():
1093            self.handler.close_connection = next(close_values)
1094        self.handler.handle_one_request = handle_one_request
1095
1096        close_values = iter((True,))
1097        self.handler.handle()
1098        self.assertRaises(StopIteration, next, close_values)
1099
1100        close_values = iter((False, False, True))
1101        self.handler.handle()
1102        self.assertRaises(StopIteration, next, close_values)
1103
1104    def test_date_time_string(self):
1105        now = time.time()
1106        # this is the old code that formats the timestamp
1107        year, month, day, hh, mm, ss, wd, y, z = time.gmtime(now)
1108        expected = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
1109            self.handler.weekdayname[wd],
1110            day,
1111            self.handler.monthname[month],
1112            year, hh, mm, ss
1113        )
1114        self.assertEqual(self.handler.date_time_string(timestamp=now), expected)
1115
1116
1117class SimpleHTTPRequestHandlerTestCase(unittest.TestCase):
1118    """ Test url parsing """
1119    def setUp(self):
1120        self.translated = os.getcwd()
1121        self.translated = os.path.join(self.translated, 'filename')
1122        self.handler = SocketlessRequestHandler()
1123
1124    def test_query_arguments(self):
1125        path = self.handler.translate_path('/filename')
1126        self.assertEqual(path, self.translated)
1127        path = self.handler.translate_path('/filename?foo=bar')
1128        self.assertEqual(path, self.translated)
1129        path = self.handler.translate_path('/filename?a=b&spam=eggs#zot')
1130        self.assertEqual(path, self.translated)
1131
1132    def test_start_with_double_slash(self):
1133        path = self.handler.translate_path('//filename')
1134        self.assertEqual(path, self.translated)
1135        path = self.handler.translate_path('//filename?foo=bar')
1136        self.assertEqual(path, self.translated)
1137
1138    def test_windows_colon(self):
1139        with support.swap_attr(server.os, 'path', ntpath):
1140            path = self.handler.translate_path('c:c:c:foo/filename')
1141            path = path.replace(ntpath.sep, os.sep)
1142            self.assertEqual(path, self.translated)
1143
1144            path = self.handler.translate_path('\\c:../filename')
1145            path = path.replace(ntpath.sep, os.sep)
1146            self.assertEqual(path, self.translated)
1147
1148            path = self.handler.translate_path('c:\\c:..\\foo/filename')
1149            path = path.replace(ntpath.sep, os.sep)
1150            self.assertEqual(path, self.translated)
1151
1152            path = self.handler.translate_path('c:c:foo\\c:c:bar/filename')
1153            path = path.replace(ntpath.sep, os.sep)
1154            self.assertEqual(path, self.translated)
1155
1156
1157class MiscTestCase(unittest.TestCase):
1158    def test_all(self):
1159        expected = []
1160        blacklist = {'executable', 'nobody_uid', 'test'}
1161        for name in dir(server):
1162            if name.startswith('_') or name in blacklist:
1163                continue
1164            module_object = getattr(server, name)
1165            if getattr(module_object, '__module__', None) == 'http.server':
1166                expected.append(name)
1167        self.assertCountEqual(server.__all__, expected)
1168
1169
1170class ScriptTestCase(unittest.TestCase):
1171
1172    def mock_server_class(self):
1173        return mock.MagicMock(
1174            return_value=mock.MagicMock(
1175                __enter__=mock.MagicMock(
1176                    return_value=mock.MagicMock(
1177                        socket=mock.MagicMock(
1178                            getsockname=lambda: ('', 0),
1179                        ),
1180                    ),
1181                ),
1182            ),
1183        )
1184
1185    @mock.patch('builtins.print')
1186    def test_server_test_unspec(self, _):
1187        mock_server = self.mock_server_class()
1188        server.test(ServerClass=mock_server, bind=None)
1189        self.assertIn(
1190            mock_server.address_family,
1191            (socket.AF_INET6, socket.AF_INET),
1192        )
1193
1194    @mock.patch('builtins.print')
1195    def test_server_test_localhost(self, _):
1196        mock_server = self.mock_server_class()
1197        server.test(ServerClass=mock_server, bind="localhost")
1198        self.assertIn(
1199            mock_server.address_family,
1200            (socket.AF_INET6, socket.AF_INET),
1201        )
1202
1203    ipv6_addrs = (
1204        "::",
1205        "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
1206        "::1",
1207    )
1208
1209    ipv4_addrs = (
1210        "0.0.0.0",
1211        "8.8.8.8",
1212        "127.0.0.1",
1213    )
1214
1215    @mock.patch('builtins.print')
1216    def test_server_test_ipv6(self, _):
1217        for bind in self.ipv6_addrs:
1218            mock_server = self.mock_server_class()
1219            server.test(ServerClass=mock_server, bind=bind)
1220            self.assertEqual(mock_server.address_family, socket.AF_INET6)
1221
1222    @mock.patch('builtins.print')
1223    def test_server_test_ipv4(self, _):
1224        for bind in self.ipv4_addrs:
1225            mock_server = self.mock_server_class()
1226            server.test(ServerClass=mock_server, bind=bind)
1227            self.assertEqual(mock_server.address_family, socket.AF_INET)
1228
1229
1230def test_main(verbose=None):
1231    cwd = os.getcwd()
1232    try:
1233        support.run_unittest(
1234            RequestHandlerLoggingTestCase,
1235            BaseHTTPRequestHandlerTestCase,
1236            BaseHTTPServerTestCase,
1237            SimpleHTTPServerTestCase,
1238            CGIHTTPServerTestCase,
1239            SimpleHTTPRequestHandlerTestCase,
1240            MiscTestCase,
1241            ScriptTestCase
1242        )
1243    finally:
1244        os.chdir(cwd)
1245
1246if __name__ == '__main__':
1247    test_main()
1248