• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Unittests for the various HTTPServer modules.
2
3Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
5"""
6from collections import OrderedDict
7from http.server import BaseHTTPRequestHandler, HTTPServer, \
8     SimpleHTTPRequestHandler, CGIHTTPRequestHandler
9from http import server, HTTPStatus
10
11import os
12import socket
13import sys
14import re
15import base64
16import ntpath
17import pathlib
18import shutil
19import email.message
20import email.utils
21import html
22import http, http.client
23import urllib.parse
24import tempfile
25import time
26import datetime
27import threading
28from unittest import mock
29from io import BytesIO
30
31import unittest
32from test import support
33
34
35class NoLogRequestHandler:
36    def log_message(self, *args):
37        # don't write log messages to stderr
38        pass
39
40    def read(self, n=None):
41        return ''
42
43
44class TestServerThread(threading.Thread):
45    def __init__(self, test_object, request_handler):
46        threading.Thread.__init__(self)
47        self.request_handler = request_handler
48        self.test_object = test_object
49
50    def run(self):
51        self.server = HTTPServer(('localhost', 0), self.request_handler)
52        self.test_object.HOST, self.test_object.PORT = self.server.socket.getsockname()
53        self.test_object.server_started.set()
54        self.test_object = None
55        try:
56            self.server.serve_forever(0.05)
57        finally:
58            self.server.server_close()
59
60    def stop(self):
61        self.server.shutdown()
62        self.join()
63
64
65class BaseTestCase(unittest.TestCase):
66    def setUp(self):
67        self._threads = support.threading_setup()
68        os.environ = support.EnvironmentVarGuard()
69        self.server_started = threading.Event()
70        self.thread = TestServerThread(self, self.request_handler)
71        self.thread.start()
72        self.server_started.wait()
73
74    def tearDown(self):
75        self.thread.stop()
76        self.thread = None
77        os.environ.__exit__()
78        support.threading_cleanup(*self._threads)
79
80    def request(self, uri, method='GET', body=None, headers={}):
81        self.connection = http.client.HTTPConnection(self.HOST, self.PORT)
82        self.connection.request(method, uri, body, headers)
83        return self.connection.getresponse()
84
85
86class BaseHTTPServerTestCase(BaseTestCase):
87    class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
88        protocol_version = 'HTTP/1.1'
89        default_request_version = 'HTTP/1.1'
90
91        def do_TEST(self):
92            self.send_response(HTTPStatus.NO_CONTENT)
93            self.send_header('Content-Type', 'text/html')
94            self.send_header('Connection', 'close')
95            self.end_headers()
96
97        def do_KEEP(self):
98            self.send_response(HTTPStatus.NO_CONTENT)
99            self.send_header('Content-Type', 'text/html')
100            self.send_header('Connection', 'keep-alive')
101            self.end_headers()
102
103        def do_KEYERROR(self):
104            self.send_error(999)
105
106        def do_NOTFOUND(self):
107            self.send_error(HTTPStatus.NOT_FOUND)
108
109        def do_EXPLAINERROR(self):
110            self.send_error(999, "Short Message",
111                            "This is a long \n explanation")
112
113        def do_CUSTOM(self):
114            self.send_response(999)
115            self.send_header('Content-Type', 'text/html')
116            self.send_header('Connection', 'close')
117            self.end_headers()
118
119        def do_LATINONEHEADER(self):
120            self.send_response(999)
121            self.send_header('X-Special', 'Dängerous Mind')
122            self.send_header('Connection', 'close')
123            self.end_headers()
124            body = self.headers['x-special-incoming'].encode('utf-8')
125            self.wfile.write(body)
126
127        def do_SEND_ERROR(self):
128            self.send_error(int(self.path[1:]))
129
130        def do_HEAD(self):
131            self.send_error(int(self.path[1:]))
132
133    def setUp(self):
134        BaseTestCase.setUp(self)
135        self.con = http.client.HTTPConnection(self.HOST, self.PORT)
136        self.con.connect()
137
138    def test_command(self):
139        self.con.request('GET', '/')
140        res = self.con.getresponse()
141        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
142
143    def test_request_line_trimming(self):
144        self.con._http_vsn_str = 'HTTP/1.1\n'
145        self.con.putrequest('XYZBOGUS', '/')
146        self.con.endheaders()
147        res = self.con.getresponse()
148        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
149
150    def test_version_bogus(self):
151        self.con._http_vsn_str = 'FUBAR'
152        self.con.putrequest('GET', '/')
153        self.con.endheaders()
154        res = self.con.getresponse()
155        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
156
157    def test_version_digits(self):
158        self.con._http_vsn_str = 'HTTP/9.9.9'
159        self.con.putrequest('GET', '/')
160        self.con.endheaders()
161        res = self.con.getresponse()
162        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
163
164    def test_version_none_get(self):
165        self.con._http_vsn_str = ''
166        self.con.putrequest('GET', '/')
167        self.con.endheaders()
168        res = self.con.getresponse()
169        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
170
171    def test_version_none(self):
172        # Test that a valid method is rejected when not HTTP/1.x
173        self.con._http_vsn_str = ''
174        self.con.putrequest('CUSTOM', '/')
175        self.con.endheaders()
176        res = self.con.getresponse()
177        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
178
179    def test_version_invalid(self):
180        self.con._http_vsn = 99
181        self.con._http_vsn_str = 'HTTP/9.9'
182        self.con.putrequest('GET', '/')
183        self.con.endheaders()
184        res = self.con.getresponse()
185        self.assertEqual(res.status, HTTPStatus.HTTP_VERSION_NOT_SUPPORTED)
186
187    def test_send_blank(self):
188        self.con._http_vsn_str = ''
189        self.con.putrequest('', '')
190        self.con.endheaders()
191        res = self.con.getresponse()
192        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
193
194    def test_header_close(self):
195        self.con.putrequest('GET', '/')
196        self.con.putheader('Connection', 'close')
197        self.con.endheaders()
198        res = self.con.getresponse()
199        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
200
201    def test_header_keep_alive(self):
202        self.con._http_vsn_str = 'HTTP/1.1'
203        self.con.putrequest('GET', '/')
204        self.con.putheader('Connection', 'keep-alive')
205        self.con.endheaders()
206        res = self.con.getresponse()
207        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
208
209    def test_handler(self):
210        self.con.request('TEST', '/')
211        res = self.con.getresponse()
212        self.assertEqual(res.status, HTTPStatus.NO_CONTENT)
213
214    def test_return_header_keep_alive(self):
215        self.con.request('KEEP', '/')
216        res = self.con.getresponse()
217        self.assertEqual(res.getheader('Connection'), 'keep-alive')
218        self.con.request('TEST', '/')
219        self.addCleanup(self.con.close)
220
221    def test_internal_key_error(self):
222        self.con.request('KEYERROR', '/')
223        res = self.con.getresponse()
224        self.assertEqual(res.status, 999)
225
226    def test_return_custom_status(self):
227        self.con.request('CUSTOM', '/')
228        res = self.con.getresponse()
229        self.assertEqual(res.status, 999)
230
231    def test_return_explain_error(self):
232        self.con.request('EXPLAINERROR', '/')
233        res = self.con.getresponse()
234        self.assertEqual(res.status, 999)
235        self.assertTrue(int(res.getheader('Content-Length')))
236
237    def test_latin1_header(self):
238        self.con.request('LATINONEHEADER', '/', headers={
239            'X-Special-Incoming':       'Ärger mit Unicode'
240        })
241        res = self.con.getresponse()
242        self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind')
243        self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8'))
244
245    def test_error_content_length(self):
246        # Issue #16088: standard error responses should have a content-length
247        self.con.request('NOTFOUND', '/')
248        res = self.con.getresponse()
249        self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
250
251        data = res.read()
252        self.assertEqual(int(res.getheader('Content-Length')), len(data))
253
254    def test_send_error(self):
255        allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED,
256                                         HTTPStatus.RESET_CONTENT)
257        for code in (HTTPStatus.NO_CONTENT, HTTPStatus.NOT_MODIFIED,
258                     HTTPStatus.PROCESSING, HTTPStatus.RESET_CONTENT,
259                     HTTPStatus.SWITCHING_PROTOCOLS):
260            self.con.request('SEND_ERROR', '/{}'.format(code))
261            res = self.con.getresponse()
262            self.assertEqual(code, res.status)
263            self.assertEqual(None, res.getheader('Content-Length'))
264            self.assertEqual(None, res.getheader('Content-Type'))
265            if code not in allow_transfer_encoding_codes:
266                self.assertEqual(None, res.getheader('Transfer-Encoding'))
267
268            data = res.read()
269            self.assertEqual(b'', data)
270
271    def test_head_via_send_error(self):
272        allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED,
273                                         HTTPStatus.RESET_CONTENT)
274        for code in (HTTPStatus.OK, HTTPStatus.NO_CONTENT,
275                     HTTPStatus.NOT_MODIFIED, HTTPStatus.RESET_CONTENT,
276                     HTTPStatus.SWITCHING_PROTOCOLS):
277            self.con.request('HEAD', '/{}'.format(code))
278            res = self.con.getresponse()
279            self.assertEqual(code, res.status)
280            if code == HTTPStatus.OK:
281                self.assertTrue(int(res.getheader('Content-Length')) > 0)
282                self.assertIn('text/html', res.getheader('Content-Type'))
283            else:
284                self.assertEqual(None, res.getheader('Content-Length'))
285                self.assertEqual(None, res.getheader('Content-Type'))
286            if code not in allow_transfer_encoding_codes:
287                self.assertEqual(None, res.getheader('Transfer-Encoding'))
288
289            data = res.read()
290            self.assertEqual(b'', data)
291
292
293class RequestHandlerLoggingTestCase(BaseTestCase):
294    class request_handler(BaseHTTPRequestHandler):
295        protocol_version = 'HTTP/1.1'
296        default_request_version = 'HTTP/1.1'
297
298        def do_GET(self):
299            self.send_response(HTTPStatus.OK)
300            self.end_headers()
301
302        def do_ERROR(self):
303            self.send_error(HTTPStatus.NOT_FOUND, 'File not found')
304
305    def test_get(self):
306        self.con = http.client.HTTPConnection(self.HOST, self.PORT)
307        self.con.connect()
308
309        with support.captured_stderr() as err:
310            self.con.request('GET', '/')
311            self.con.getresponse()
312
313        self.assertTrue(
314            err.getvalue().endswith('"GET / HTTP/1.1" 200 -\n'))
315
316    def test_err(self):
317        self.con = http.client.HTTPConnection(self.HOST, self.PORT)
318        self.con.connect()
319
320        with support.captured_stderr() as err:
321            self.con.request('ERROR', '/')
322            self.con.getresponse()
323
324        lines = err.getvalue().split('\n')
325        self.assertTrue(lines[0].endswith('code 404, message File not found'))
326        self.assertTrue(lines[1].endswith('"ERROR / HTTP/1.1" 404 -'))
327
328
329class SimpleHTTPServerTestCase(BaseTestCase):
330    class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
331        pass
332
333    def setUp(self):
334        super().setUp()
335        self.cwd = os.getcwd()
336        basetempdir = tempfile.gettempdir()
337        os.chdir(basetempdir)
338        self.data = b'We are the knights who say Ni!'
339        self.tempdir = tempfile.mkdtemp(dir=basetempdir)
340        self.tempdir_name = os.path.basename(self.tempdir)
341        self.base_url = '/' + self.tempdir_name
342        tempname = os.path.join(self.tempdir, 'test')
343        with open(tempname, 'wb') as temp:
344            temp.write(self.data)
345            temp.flush()
346        mtime = os.stat(tempname).st_mtime
347        # compute last modification datetime for browser cache tests
348        last_modif = datetime.datetime.fromtimestamp(mtime,
349            datetime.timezone.utc)
350        self.last_modif_datetime = last_modif.replace(microsecond=0)
351        self.last_modif_header = email.utils.formatdate(
352            last_modif.timestamp(), usegmt=True)
353
354    def tearDown(self):
355        try:
356            os.chdir(self.cwd)
357            try:
358                shutil.rmtree(self.tempdir)
359            except:
360                pass
361        finally:
362            super().tearDown()
363
364    def check_status_and_reason(self, response, status, data=None):
365        def close_conn():
366            """Don't close reader yet so we can check if there was leftover
367            buffered input"""
368            nonlocal reader
369            reader = response.fp
370            response.fp = None
371        reader = None
372        response._close_conn = close_conn
373
374        body = response.read()
375        self.assertTrue(response)
376        self.assertEqual(response.status, status)
377        self.assertIsNotNone(response.reason)
378        if data:
379            self.assertEqual(data, body)
380        # Ensure the server has not set up a persistent connection, and has
381        # not sent any extra data
382        self.assertEqual(response.version, 10)
383        self.assertEqual(response.msg.get("Connection", "close"), "close")
384        self.assertEqual(reader.read(30), b'', 'Connection should be closed')
385
386        reader.close()
387        return body
388
389    @unittest.skipIf(sys.platform == 'darwin',
390                     'undecodable name cannot always be decoded on macOS')
391    @unittest.skipIf(sys.platform == 'win32',
392                     'undecodable name cannot be decoded on win32')
393    @unittest.skipUnless(support.TESTFN_UNDECODABLE,
394                         'need support.TESTFN_UNDECODABLE')
395    def test_undecodable_filename(self):
396        enc = sys.getfilesystemencoding()
397        filename = os.fsdecode(support.TESTFN_UNDECODABLE) + '.txt'
398        with open(os.path.join(self.tempdir, filename), 'wb') as f:
399            f.write(support.TESTFN_UNDECODABLE)
400        response = self.request(self.base_url + '/')
401        if sys.platform == 'darwin':
402            # On Mac OS the HFS+ filesystem replaces bytes that aren't valid
403            # UTF-8 into a percent-encoded value.
404            for name in os.listdir(self.tempdir):
405                if name != 'test': # Ignore a filename created in setUp().
406                    filename = name
407                    break
408        body = self.check_status_and_reason(response, HTTPStatus.OK)
409        quotedname = urllib.parse.quote(filename, errors='surrogatepass')
410        self.assertIn(('href="%s"' % quotedname)
411                      .encode(enc, 'surrogateescape'), body)
412        self.assertIn(('>%s<' % html.escape(filename, quote=False))
413                      .encode(enc, 'surrogateescape'), body)
414        response = self.request(self.base_url + '/' + quotedname)
415        self.check_status_and_reason(response, HTTPStatus.OK,
416                                     data=support.TESTFN_UNDECODABLE)
417
418    def test_get_dir_redirect_location_domain_injection_bug(self):
419        """Ensure //evil.co/..%2f../../X does not put //evil.co/ in Location.
420
421        //netloc/ in a Location header is a redirect to a new host.
422        https://github.com/python/cpython/issues/87389
423
424        This checks that a path resolving to a directory on our server cannot
425        resolve into a redirect to another server.
426        """
427        os.mkdir(os.path.join(self.tempdir, 'existing_directory'))
428        url = f'/python.org/..%2f..%2f..%2f..%2f..%2f../%0a%0d/../{self.tempdir_name}/existing_directory'
429        expected_location = f'{url}/'  # /python.org.../ single slash single prefix, trailing slash
430        # Canonicalizes to /tmp/tempdir_name/existing_directory which does
431        # exist and is a dir, triggering the 301 redirect logic.
432        response = self.request(url)
433        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
434        location = response.getheader('Location')
435        self.assertEqual(location, expected_location, msg='non-attack failed!')
436
437        # //python.org... multi-slash prefix, no trailing slash
438        attack_url = f'/{url}'
439        response = self.request(attack_url)
440        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
441        location = response.getheader('Location')
442        self.assertFalse(location.startswith('//'), msg=location)
443        self.assertEqual(location, expected_location,
444                msg='Expected Location header to start with a single / and '
445                'end with a / as this is a directory redirect.')
446
447        # ///python.org... triple-slash prefix, no trailing slash
448        attack3_url = f'//{url}'
449        response = self.request(attack3_url)
450        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
451        self.assertEqual(response.getheader('Location'), expected_location)
452
453        # If the second word in the http request (Request-URI for the http
454        # method) is a full URI, we don't worry about it, as that'll be parsed
455        # and reassembled as a full URI within BaseHTTPRequestHandler.send_head
456        # so no errant scheme-less //netloc//evil.co/ domain mixup can happen.
457        attack_scheme_netloc_2slash_url = f'https://pypi.org/{url}'
458        expected_scheme_netloc_location = f'{attack_scheme_netloc_2slash_url}/'
459        response = self.request(attack_scheme_netloc_2slash_url)
460        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
461        location = response.getheader('Location')
462        # We're just ensuring that the scheme and domain make it through, if
463        # there are or aren't multiple slashes at the start of the path that
464        # follows that isn't important in this Location: header.
465        self.assertTrue(location.startswith('https://pypi.org/'), msg=location)
466
467    def test_get(self):
468        #constructs the path relative to the root directory of the HTTPServer
469        response = self.request(self.base_url + '/test')
470        self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
471        # check for trailing "/" which should return 404. See Issue17324
472        response = self.request(self.base_url + '/test/')
473        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
474        response = self.request(self.base_url + '/')
475        self.check_status_and_reason(response, HTTPStatus.OK)
476        response = self.request(self.base_url)
477        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
478        response = self.request(self.base_url + '/?hi=2')
479        self.check_status_and_reason(response, HTTPStatus.OK)
480        response = self.request(self.base_url + '?hi=1')
481        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
482        self.assertEqual(response.getheader("Location"),
483                         self.base_url + "/?hi=1")
484        response = self.request('/ThisDoesNotExist')
485        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
486        response = self.request('/' + 'ThisDoesNotExist' + '/')
487        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
488
489        data = b"Dummy index file\r\n"
490        with open(os.path.join(self.tempdir_name, 'index.html'), 'wb') as f:
491            f.write(data)
492        response = self.request(self.base_url + '/')
493        self.check_status_and_reason(response, HTTPStatus.OK, data)
494
495        # chmod() doesn't work as expected on Windows, and filesystem
496        # permissions are ignored by root on Unix.
497        if os.name == 'posix' and os.geteuid() != 0:
498            os.chmod(self.tempdir, 0)
499            try:
500                response = self.request(self.base_url + '/')
501                self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
502            finally:
503                os.chmod(self.tempdir, 0o755)
504
505    def test_head(self):
506        response = self.request(
507            self.base_url + '/test', method='HEAD')
508        self.check_status_and_reason(response, HTTPStatus.OK)
509        self.assertEqual(response.getheader('content-length'),
510                         str(len(self.data)))
511        self.assertEqual(response.getheader('content-type'),
512                         'application/octet-stream')
513
514    def test_browser_cache(self):
515        """Check that when a request to /test is sent with the request header
516        If-Modified-Since set to date of last modification, the server returns
517        status code 304, not 200
518        """
519        headers = email.message.Message()
520        headers['If-Modified-Since'] = self.last_modif_header
521        response = self.request(self.base_url + '/test', headers=headers)
522        self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED)
523
524        # one hour after last modification : must return 304
525        new_dt = self.last_modif_datetime + datetime.timedelta(hours=1)
526        headers = email.message.Message()
527        headers['If-Modified-Since'] = email.utils.format_datetime(new_dt,
528            usegmt=True)
529        response = self.request(self.base_url + '/test', headers=headers)
530        self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED)
531
532    def test_browser_cache_file_changed(self):
533        # with If-Modified-Since earlier than Last-Modified, must return 200
534        dt = self.last_modif_datetime
535        # build datetime object : 365 days before last modification
536        old_dt = dt - datetime.timedelta(days=365)
537        headers = email.message.Message()
538        headers['If-Modified-Since'] = email.utils.format_datetime(old_dt,
539            usegmt=True)
540        response = self.request(self.base_url + '/test', headers=headers)
541        self.check_status_and_reason(response, HTTPStatus.OK)
542
543    def test_browser_cache_with_If_None_Match_header(self):
544        # if If-None-Match header is present, ignore If-Modified-Since
545
546        headers = email.message.Message()
547        headers['If-Modified-Since'] = self.last_modif_header
548        headers['If-None-Match'] = "*"
549        response = self.request(self.base_url + '/test', headers=headers)
550        self.check_status_and_reason(response, HTTPStatus.OK)
551
552    def test_invalid_requests(self):
553        response = self.request('/', method='FOO')
554        self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
555        # requests must be case sensitive,so this should fail too
556        response = self.request('/', method='custom')
557        self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
558        response = self.request('/', method='GETs')
559        self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
560
561    def test_last_modified(self):
562        """Checks that the datetime returned in Last-Modified response header
563        is the actual datetime of last modification, rounded to the second
564        """
565        response = self.request(self.base_url + '/test')
566        self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
567        last_modif_header = response.headers['Last-modified']
568        self.assertEqual(last_modif_header, self.last_modif_header)
569
570    def test_path_without_leading_slash(self):
571        response = self.request(self.tempdir_name + '/test')
572        self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
573        response = self.request(self.tempdir_name + '/test/')
574        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
575        response = self.request(self.tempdir_name + '/')
576        self.check_status_and_reason(response, HTTPStatus.OK)
577        response = self.request(self.tempdir_name)
578        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
579        response = self.request(self.tempdir_name + '/?hi=2')
580        self.check_status_and_reason(response, HTTPStatus.OK)
581        response = self.request(self.tempdir_name + '?hi=1')
582        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
583        self.assertEqual(response.getheader("Location"),
584                         self.tempdir_name + "/?hi=1")
585
586    def test_html_escape_filename(self):
587        filename = '<test&>.txt'
588        fullpath = os.path.join(self.tempdir, filename)
589
590        try:
591            open(fullpath, 'w').close()
592        except OSError:
593            raise unittest.SkipTest('Can not create file %s on current file '
594                                    'system' % filename)
595
596        try:
597            response = self.request(self.base_url + '/')
598            body = self.check_status_and_reason(response, HTTPStatus.OK)
599            enc = response.headers.get_content_charset()
600        finally:
601            os.unlink(fullpath)  # avoid affecting test_undecodable_filename
602
603        self.assertIsNotNone(enc)
604        html_text = '>%s<' % html.escape(filename, quote=False)
605        self.assertIn(html_text.encode(enc), body)
606
607
608cgi_file1 = """\
609#!%s
610
611print("Content-type: text/html")
612print()
613print("Hello World")
614"""
615
616cgi_file2 = """\
617#!%s
618import cgi
619
620print("Content-type: text/html")
621print()
622
623form = cgi.FieldStorage()
624print("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),
625                          form.getfirst("bacon")))
626"""
627
628cgi_file4 = """\
629#!%s
630import os
631
632print("Content-type: text/html")
633print()
634
635print(os.environ["%s"])
636"""
637
638cgi_file6 = """\
639#!%s
640import os
641
642print("Content-type: text/plain")
643print()
644print(repr(os.environ))
645"""
646
647
648@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
649        "This test can't be run reliably as root (issue #13308).")
650class CGIHTTPServerTestCase(BaseTestCase):
651    class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
652        pass
653
654    linesep = os.linesep.encode('ascii')
655
656    def setUp(self):
657        BaseTestCase.setUp(self)
658        self.cwd = os.getcwd()
659        self.parent_dir = tempfile.mkdtemp()
660        self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
661        self.cgi_child_dir = os.path.join(self.cgi_dir, 'child-dir')
662        self.sub_dir_1 = os.path.join(self.parent_dir, 'sub')
663        self.sub_dir_2 = os.path.join(self.sub_dir_1, 'dir')
664        self.cgi_dir_in_sub_dir = os.path.join(self.sub_dir_2, 'cgi-bin')
665        os.mkdir(self.cgi_dir)
666        os.mkdir(self.cgi_child_dir)
667        os.mkdir(self.sub_dir_1)
668        os.mkdir(self.sub_dir_2)
669        os.mkdir(self.cgi_dir_in_sub_dir)
670        self.nocgi_path = None
671        self.file1_path = None
672        self.file2_path = None
673        self.file3_path = None
674        self.file4_path = None
675        self.file5_path = None
676
677        # The shebang line should be pure ASCII: use symlink if possible.
678        # See issue #7668.
679        self._pythonexe_symlink = None
680        if support.can_symlink():
681            self.pythonexe = os.path.join(self.parent_dir, 'python')
682            self._pythonexe_symlink = support.PythonSymlink(self.pythonexe).__enter__()
683        else:
684            self.pythonexe = sys.executable
685
686        try:
687            # The python executable path is written as the first line of the
688            # CGI Python script. The encoding cookie cannot be used, and so the
689            # path should be encodable to the default script encoding (utf-8)
690            self.pythonexe.encode('utf-8')
691        except UnicodeEncodeError:
692            self.tearDown()
693            self.skipTest("Python executable path is not encodable to utf-8")
694
695        self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py')
696        with open(self.nocgi_path, 'w') as fp:
697            fp.write(cgi_file1 % self.pythonexe)
698        os.chmod(self.nocgi_path, 0o777)
699
700        self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
701        with open(self.file1_path, 'w', encoding='utf-8') as file1:
702            file1.write(cgi_file1 % self.pythonexe)
703        os.chmod(self.file1_path, 0o777)
704
705        self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
706        with open(self.file2_path, 'w', encoding='utf-8') as file2:
707            file2.write(cgi_file2 % self.pythonexe)
708        os.chmod(self.file2_path, 0o777)
709
710        self.file3_path = os.path.join(self.cgi_child_dir, 'file3.py')
711        with open(self.file3_path, 'w', encoding='utf-8') as file3:
712            file3.write(cgi_file1 % self.pythonexe)
713        os.chmod(self.file3_path, 0o777)
714
715        self.file4_path = os.path.join(self.cgi_dir, 'file4.py')
716        with open(self.file4_path, 'w', encoding='utf-8') as file4:
717            file4.write(cgi_file4 % (self.pythonexe, 'QUERY_STRING'))
718        os.chmod(self.file4_path, 0o777)
719
720        self.file5_path = os.path.join(self.cgi_dir_in_sub_dir, 'file5.py')
721        with open(self.file5_path, 'w', encoding='utf-8') as file5:
722            file5.write(cgi_file1 % self.pythonexe)
723        os.chmod(self.file5_path, 0o777)
724
725        self.file6_path = os.path.join(self.cgi_dir, 'file6.py')
726        with open(self.file6_path, 'w', encoding='utf-8') as file6:
727            file6.write(cgi_file6 % self.pythonexe)
728        os.chmod(self.file6_path, 0o777)
729
730        os.chdir(self.parent_dir)
731
732    def tearDown(self):
733        try:
734            os.chdir(self.cwd)
735            if self._pythonexe_symlink:
736                self._pythonexe_symlink.__exit__(None, None, None)
737            if self.nocgi_path:
738                os.remove(self.nocgi_path)
739            if self.file1_path:
740                os.remove(self.file1_path)
741            if self.file2_path:
742                os.remove(self.file2_path)
743            if self.file3_path:
744                os.remove(self.file3_path)
745            if self.file4_path:
746                os.remove(self.file4_path)
747            if self.file5_path:
748                os.remove(self.file5_path)
749            if self.file6_path:
750                os.remove(self.file6_path)
751            os.rmdir(self.cgi_child_dir)
752            os.rmdir(self.cgi_dir)
753            os.rmdir(self.cgi_dir_in_sub_dir)
754            os.rmdir(self.sub_dir_2)
755            os.rmdir(self.sub_dir_1)
756            os.rmdir(self.parent_dir)
757        finally:
758            BaseTestCase.tearDown(self)
759
760    def test_url_collapse_path(self):
761        # verify tail is the last portion and head is the rest on proper urls
762        test_vectors = {
763            '': '//',
764            '..': IndexError,
765            '/.//..': IndexError,
766            '/': '//',
767            '//': '//',
768            '/\\': '//\\',
769            '/.//': '//',
770            'cgi-bin/file1.py': '/cgi-bin/file1.py',
771            '/cgi-bin/file1.py': '/cgi-bin/file1.py',
772            'a': '//a',
773            '/a': '//a',
774            '//a': '//a',
775            './a': '//a',
776            './C:/': '/C:/',
777            '/a/b': '/a/b',
778            '/a/b/': '/a/b/',
779            '/a/b/.': '/a/b/',
780            '/a/b/c/..': '/a/b/',
781            '/a/b/c/../d': '/a/b/d',
782            '/a/b/c/../d/e/../f': '/a/b/d/f',
783            '/a/b/c/../d/e/../../f': '/a/b/f',
784            '/a/b/c/../d/e/.././././..//f': '/a/b/f',
785            '../a/b/c/../d/e/.././././..//f': IndexError,
786            '/a/b/c/../d/e/../../../f': '/a/f',
787            '/a/b/c/../d/e/../../../../f': '//f',
788            '/a/b/c/../d/e/../../../../../f': IndexError,
789            '/a/b/c/../d/e/../../../../f/..': '//',
790            '/a/b/c/../d/e/../../../../f/../.': '//',
791        }
792        for path, expected in test_vectors.items():
793            if isinstance(expected, type) and issubclass(expected, Exception):
794                self.assertRaises(expected,
795                                  server._url_collapse_path, path)
796            else:
797                actual = server._url_collapse_path(path)
798                self.assertEqual(expected, actual,
799                                 msg='path = %r\nGot:    %r\nWanted: %r' %
800                                 (path, actual, expected))
801
802    def test_headers_and_content(self):
803        res = self.request('/cgi-bin/file1.py')
804        self.assertEqual(
805            (res.read(), res.getheader('Content-type'), res.status),
806            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK))
807
808    def test_issue19435(self):
809        res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh')
810        self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
811
812    def test_post(self):
813        params = urllib.parse.urlencode(
814            {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
815        headers = {'Content-type' : 'application/x-www-form-urlencoded'}
816        res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
817
818        self.assertEqual(res.read(), b'1, python, 123456' + self.linesep)
819
820    def test_invaliduri(self):
821        res = self.request('/cgi-bin/invalid')
822        res.read()
823        self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
824
825    def test_authorization(self):
826        headers = {b'Authorization' : b'Basic ' +
827                   base64.b64encode(b'username:pass')}
828        res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
829        self.assertEqual(
830            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
831            (res.read(), res.getheader('Content-type'), res.status))
832
833    def test_no_leading_slash(self):
834        # http://bugs.python.org/issue2254
835        res = self.request('cgi-bin/file1.py')
836        self.assertEqual(
837            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
838            (res.read(), res.getheader('Content-type'), res.status))
839
840    def test_os_environ_is_not_altered(self):
841        signature = "Test CGI Server"
842        os.environ['SERVER_SOFTWARE'] = signature
843        res = self.request('/cgi-bin/file1.py')
844        self.assertEqual(
845            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
846            (res.read(), res.getheader('Content-type'), res.status))
847        self.assertEqual(os.environ['SERVER_SOFTWARE'], signature)
848
849    def test_urlquote_decoding_in_cgi_check(self):
850        res = self.request('/cgi-bin%2ffile1.py')
851        self.assertEqual(
852            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
853            (res.read(), res.getheader('Content-type'), res.status))
854
855    def test_nested_cgi_path_issue21323(self):
856        res = self.request('/cgi-bin/child-dir/file3.py')
857        self.assertEqual(
858            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
859            (res.read(), res.getheader('Content-type'), res.status))
860
861    def test_query_with_multiple_question_mark(self):
862        res = self.request('/cgi-bin/file4.py?a=b?c=d')
863        self.assertEqual(
864            (b'a=b?c=d' + self.linesep, 'text/html', HTTPStatus.OK),
865            (res.read(), res.getheader('Content-type'), res.status))
866
867    def test_query_with_continuous_slashes(self):
868        res = self.request('/cgi-bin/file4.py?k=aa%2F%2Fbb&//q//p//=//a//b//')
869        self.assertEqual(
870            (b'k=aa%2F%2Fbb&//q//p//=//a//b//' + self.linesep,
871             'text/html', HTTPStatus.OK),
872            (res.read(), res.getheader('Content-type'), res.status))
873
874    def test_cgi_path_in_sub_directories(self):
875        try:
876            CGIHTTPRequestHandler.cgi_directories.append('/sub/dir/cgi-bin')
877            res = self.request('/sub/dir/cgi-bin/file5.py')
878            self.assertEqual(
879                (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
880                (res.read(), res.getheader('Content-type'), res.status))
881        finally:
882            CGIHTTPRequestHandler.cgi_directories.remove('/sub/dir/cgi-bin')
883
884    def test_accept(self):
885        browser_accept = \
886                    'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
887        tests = (
888            ((('Accept', browser_accept),), browser_accept),
889            ((), ''),
890            # Hack case to get two values for the one header
891            ((('Accept', 'text/html'), ('ACCEPT', 'text/plain')),
892               'text/html,text/plain'),
893        )
894        for headers, expected in tests:
895            headers = OrderedDict(headers)
896            with self.subTest(headers):
897                res = self.request('/cgi-bin/file6.py', 'GET', headers=headers)
898                self.assertEqual(http.HTTPStatus.OK, res.status)
899                expected = f"'HTTP_ACCEPT': {expected!r}"
900                self.assertIn(expected.encode('ascii'), res.read())
901
902
903class SocketlessRequestHandler(SimpleHTTPRequestHandler):
904    def __init__(self, directory=None):
905        request = mock.Mock()
906        request.makefile.return_value = BytesIO()
907        super().__init__(request, None, None, directory=directory)
908
909        self.get_called = False
910        self.protocol_version = "HTTP/1.1"
911
912    def do_GET(self):
913        self.get_called = True
914        self.send_response(HTTPStatus.OK)
915        self.send_header('Content-Type', 'text/html')
916        self.end_headers()
917        self.wfile.write(b'<html><body>Data</body></html>\r\n')
918
919    def log_message(self, format, *args):
920        pass
921
922class RejectingSocketlessRequestHandler(SocketlessRequestHandler):
923    def handle_expect_100(self):
924        self.send_error(HTTPStatus.EXPECTATION_FAILED)
925        return False
926
927
928class AuditableBytesIO:
929
930    def __init__(self):
931        self.datas = []
932
933    def write(self, data):
934        self.datas.append(data)
935
936    def getData(self):
937        return b''.join(self.datas)
938
939    @property
940    def numWrites(self):
941        return len(self.datas)
942
943
944class BaseHTTPRequestHandlerTestCase(unittest.TestCase):
945    """Test the functionality of the BaseHTTPServer.
946
947       Test the support for the Expect 100-continue header.
948       """
949
950    HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK')
951
952    def setUp (self):
953        self.handler = SocketlessRequestHandler()
954
955    def send_typical_request(self, message):
956        input = BytesIO(message)
957        output = BytesIO()
958        self.handler.rfile = input
959        self.handler.wfile = output
960        self.handler.handle_one_request()
961        output.seek(0)
962        return output.readlines()
963
964    def verify_get_called(self):
965        self.assertTrue(self.handler.get_called)
966
967    def verify_expected_headers(self, headers):
968        for fieldName in b'Server: ', b'Date: ', b'Content-Type: ':
969            self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1)
970
971    def verify_http_server_response(self, response):
972        match = self.HTTPResponseMatch.search(response)
973        self.assertIsNotNone(match)
974
975    def test_http_1_1(self):
976        result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n')
977        self.verify_http_server_response(result[0])
978        self.verify_expected_headers(result[1:-1])
979        self.verify_get_called()
980        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
981        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
982        self.assertEqual(self.handler.command, 'GET')
983        self.assertEqual(self.handler.path, '/')
984        self.assertEqual(self.handler.request_version, 'HTTP/1.1')
985        self.assertSequenceEqual(self.handler.headers.items(), ())
986
987    def test_http_1_0(self):
988        result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n')
989        self.verify_http_server_response(result[0])
990        self.verify_expected_headers(result[1:-1])
991        self.verify_get_called()
992        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
993        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0')
994        self.assertEqual(self.handler.command, 'GET')
995        self.assertEqual(self.handler.path, '/')
996        self.assertEqual(self.handler.request_version, 'HTTP/1.0')
997        self.assertSequenceEqual(self.handler.headers.items(), ())
998
999    def test_http_0_9(self):
1000        result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n')
1001        self.assertEqual(len(result), 1)
1002        self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n')
1003        self.verify_get_called()
1004
1005    def test_extra_space(self):
1006        result = self.send_typical_request(
1007            b'GET /spaced out HTTP/1.1\r\n'
1008            b'Host: dummy\r\n'
1009            b'\r\n'
1010        )
1011        self.assertTrue(result[0].startswith(b'HTTP/1.1 400 '))
1012        self.verify_expected_headers(result[1:result.index(b'\r\n')])
1013        self.assertFalse(self.handler.get_called)
1014
1015    def test_with_continue_1_0(self):
1016        result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n')
1017        self.verify_http_server_response(result[0])
1018        self.verify_expected_headers(result[1:-1])
1019        self.verify_get_called()
1020        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
1021        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0')
1022        self.assertEqual(self.handler.command, 'GET')
1023        self.assertEqual(self.handler.path, '/')
1024        self.assertEqual(self.handler.request_version, 'HTTP/1.0')
1025        headers = (("Expect", "100-continue"),)
1026        self.assertSequenceEqual(self.handler.headers.items(), headers)
1027
1028    def test_with_continue_1_1(self):
1029        result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
1030        self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n')
1031        self.assertEqual(result[1], b'\r\n')
1032        self.assertEqual(result[2], b'HTTP/1.1 200 OK\r\n')
1033        self.verify_expected_headers(result[2:-1])
1034        self.verify_get_called()
1035        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
1036        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
1037        self.assertEqual(self.handler.command, 'GET')
1038        self.assertEqual(self.handler.path, '/')
1039        self.assertEqual(self.handler.request_version, 'HTTP/1.1')
1040        headers = (("Expect", "100-continue"),)
1041        self.assertSequenceEqual(self.handler.headers.items(), headers)
1042
1043    def test_header_buffering_of_send_error(self):
1044
1045        input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
1046        output = AuditableBytesIO()
1047        handler = SocketlessRequestHandler()
1048        handler.rfile = input
1049        handler.wfile = output
1050        handler.request_version = 'HTTP/1.1'
1051        handler.requestline = ''
1052        handler.command = None
1053
1054        handler.send_error(418)
1055        self.assertEqual(output.numWrites, 2)
1056
1057    def test_header_buffering_of_send_response_only(self):
1058
1059        input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
1060        output = AuditableBytesIO()
1061        handler = SocketlessRequestHandler()
1062        handler.rfile = input
1063        handler.wfile = output
1064        handler.request_version = 'HTTP/1.1'
1065
1066        handler.send_response_only(418)
1067        self.assertEqual(output.numWrites, 0)
1068        handler.end_headers()
1069        self.assertEqual(output.numWrites, 1)
1070
1071    def test_header_buffering_of_send_header(self):
1072
1073        input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
1074        output = AuditableBytesIO()
1075        handler = SocketlessRequestHandler()
1076        handler.rfile = input
1077        handler.wfile = output
1078        handler.request_version = 'HTTP/1.1'
1079
1080        handler.send_header('Foo', 'foo')
1081        handler.send_header('bar', 'bar')
1082        self.assertEqual(output.numWrites, 0)
1083        handler.end_headers()
1084        self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n')
1085        self.assertEqual(output.numWrites, 1)
1086
1087    def test_header_unbuffered_when_continue(self):
1088
1089        def _readAndReseek(f):
1090            pos = f.tell()
1091            f.seek(0)
1092            data = f.read()
1093            f.seek(pos)
1094            return data
1095
1096        input = BytesIO(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
1097        output = BytesIO()
1098        self.handler.rfile = input
1099        self.handler.wfile = output
1100        self.handler.request_version = 'HTTP/1.1'
1101
1102        self.handler.handle_one_request()
1103        self.assertNotEqual(_readAndReseek(output), b'')
1104        result = _readAndReseek(output).split(b'\r\n')
1105        self.assertEqual(result[0], b'HTTP/1.1 100 Continue')
1106        self.assertEqual(result[1], b'')
1107        self.assertEqual(result[2], b'HTTP/1.1 200 OK')
1108
1109    def test_with_continue_rejected(self):
1110        usual_handler = self.handler        # Save to avoid breaking any subsequent tests.
1111        self.handler = RejectingSocketlessRequestHandler()
1112        result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
1113        self.assertEqual(result[0], b'HTTP/1.1 417 Expectation Failed\r\n')
1114        self.verify_expected_headers(result[1:-1])
1115        # The expect handler should short circuit the usual get method by
1116        # returning false here, so get_called should be false
1117        self.assertFalse(self.handler.get_called)
1118        self.assertEqual(sum(r == b'Connection: close\r\n' for r in result[1:-1]), 1)
1119        self.handler = usual_handler        # Restore to avoid breaking any subsequent tests.
1120
1121    def test_request_length(self):
1122        # Issue #10714: huge request lines are discarded, to avoid Denial
1123        # of Service attacks.
1124        result = self.send_typical_request(b'GET ' + b'x' * 65537)
1125        self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n')
1126        self.assertFalse(self.handler.get_called)
1127        self.assertIsInstance(self.handler.requestline, str)
1128
1129    def test_header_length(self):
1130        # Issue #6791: same for headers
1131        result = self.send_typical_request(
1132            b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n')
1133        self.assertEqual(result[0], b'HTTP/1.1 431 Line too long\r\n')
1134        self.assertFalse(self.handler.get_called)
1135        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
1136
1137    def test_too_many_headers(self):
1138        result = self.send_typical_request(
1139            b'GET / HTTP/1.1\r\n' + b'X-Foo: bar\r\n' * 101 + b'\r\n')
1140        self.assertEqual(result[0], b'HTTP/1.1 431 Too many headers\r\n')
1141        self.assertFalse(self.handler.get_called)
1142        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
1143
1144    def test_html_escape_on_error(self):
1145        result = self.send_typical_request(
1146            b'<script>alert("hello")</script> / HTTP/1.1')
1147        result = b''.join(result)
1148        text = '<script>alert("hello")</script>'
1149        self.assertIn(html.escape(text, quote=False).encode('ascii'), result)
1150
1151    def test_close_connection(self):
1152        # handle_one_request() should be repeatedly called until
1153        # it sets close_connection
1154        def handle_one_request():
1155            self.handler.close_connection = next(close_values)
1156        self.handler.handle_one_request = handle_one_request
1157
1158        close_values = iter((True,))
1159        self.handler.handle()
1160        self.assertRaises(StopIteration, next, close_values)
1161
1162        close_values = iter((False, False, True))
1163        self.handler.handle()
1164        self.assertRaises(StopIteration, next, close_values)
1165
1166    def test_date_time_string(self):
1167        now = time.time()
1168        # this is the old code that formats the timestamp
1169        year, month, day, hh, mm, ss, wd, y, z = time.gmtime(now)
1170        expected = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
1171            self.handler.weekdayname[wd],
1172            day,
1173            self.handler.monthname[month],
1174            year, hh, mm, ss
1175        )
1176        self.assertEqual(self.handler.date_time_string(timestamp=now), expected)
1177
1178
1179class SimpleHTTPRequestHandlerTestCase(unittest.TestCase):
1180    """ Test url parsing """
1181    def setUp(self):
1182        self.translated_1 = os.path.join(os.getcwd(), 'filename')
1183        self.translated_2 = os.path.join('foo', 'filename')
1184        self.translated_3 = os.path.join('bar', 'filename')
1185        self.handler_1 = SocketlessRequestHandler()
1186        self.handler_2 = SocketlessRequestHandler(directory='foo')
1187        self.handler_3 = SocketlessRequestHandler(directory=pathlib.PurePath('bar'))
1188
1189    def test_query_arguments(self):
1190        path = self.handler_1.translate_path('/filename')
1191        self.assertEqual(path, self.translated_1)
1192        path = self.handler_2.translate_path('/filename')
1193        self.assertEqual(path, self.translated_2)
1194        path = self.handler_3.translate_path('/filename')
1195        self.assertEqual(path, self.translated_3)
1196
1197        path = self.handler_1.translate_path('/filename?foo=bar')
1198        self.assertEqual(path, self.translated_1)
1199        path = self.handler_2.translate_path('/filename?foo=bar')
1200        self.assertEqual(path, self.translated_2)
1201        path = self.handler_3.translate_path('/filename?foo=bar')
1202        self.assertEqual(path, self.translated_3)
1203
1204        path = self.handler_1.translate_path('/filename?a=b&spam=eggs#zot')
1205        self.assertEqual(path, self.translated_1)
1206        path = self.handler_2.translate_path('/filename?a=b&spam=eggs#zot')
1207        self.assertEqual(path, self.translated_2)
1208        path = self.handler_3.translate_path('/filename?a=b&spam=eggs#zot')
1209        self.assertEqual(path, self.translated_3)
1210
1211    def test_start_with_double_slash(self):
1212        path = self.handler_1.translate_path('//filename')
1213        self.assertEqual(path, self.translated_1)
1214        path = self.handler_2.translate_path('//filename')
1215        self.assertEqual(path, self.translated_2)
1216        path = self.handler_3.translate_path('//filename')
1217        self.assertEqual(path, self.translated_3)
1218
1219        path = self.handler_1.translate_path('//filename?foo=bar')
1220        self.assertEqual(path, self.translated_1)
1221        path = self.handler_2.translate_path('//filename?foo=bar')
1222        self.assertEqual(path, self.translated_2)
1223        path = self.handler_3.translate_path('//filename?foo=bar')
1224        self.assertEqual(path, self.translated_3)
1225
1226    def test_windows_colon(self):
1227        with support.swap_attr(server.os, 'path', ntpath):
1228            path = self.handler_1.translate_path('c:c:c:foo/filename')
1229            path = path.replace(ntpath.sep, os.sep)
1230            self.assertEqual(path, self.translated_1)
1231            path = self.handler_2.translate_path('c:c:c:foo/filename')
1232            path = path.replace(ntpath.sep, os.sep)
1233            self.assertEqual(path, self.translated_2)
1234            path = self.handler_3.translate_path('c:c:c:foo/filename')
1235            path = path.replace(ntpath.sep, os.sep)
1236            self.assertEqual(path, self.translated_3)
1237
1238            path = self.handler_1.translate_path('\\c:../filename')
1239            path = path.replace(ntpath.sep, os.sep)
1240            self.assertEqual(path, self.translated_1)
1241            path = self.handler_2.translate_path('\\c:../filename')
1242            path = path.replace(ntpath.sep, os.sep)
1243            self.assertEqual(path, self.translated_2)
1244            path = self.handler_3.translate_path('\\c:../filename')
1245            path = path.replace(ntpath.sep, os.sep)
1246            self.assertEqual(path, self.translated_3)
1247
1248            path = self.handler_1.translate_path('c:\\c:..\\foo/filename')
1249            path = path.replace(ntpath.sep, os.sep)
1250            self.assertEqual(path, self.translated_1)
1251            path = self.handler_2.translate_path('c:\\c:..\\foo/filename')
1252            path = path.replace(ntpath.sep, os.sep)
1253            self.assertEqual(path, self.translated_2)
1254            path = self.handler_3.translate_path('c:\\c:..\\foo/filename')
1255            path = path.replace(ntpath.sep, os.sep)
1256            self.assertEqual(path, self.translated_3)
1257
1258            path = self.handler_1.translate_path('c:c:foo\\c:c:bar/filename')
1259            path = path.replace(ntpath.sep, os.sep)
1260            self.assertEqual(path, self.translated_1)
1261            path = self.handler_2.translate_path('c:c:foo\\c:c:bar/filename')
1262            path = path.replace(ntpath.sep, os.sep)
1263            self.assertEqual(path, self.translated_2)
1264            path = self.handler_3.translate_path('c:c:foo\\c:c:bar/filename')
1265            path = path.replace(ntpath.sep, os.sep)
1266            self.assertEqual(path, self.translated_3)
1267
1268
1269class MiscTestCase(unittest.TestCase):
1270    def test_all(self):
1271        expected = []
1272        blacklist = {'executable', 'nobody_uid', 'test'}
1273        for name in dir(server):
1274            if name.startswith('_') or name in blacklist:
1275                continue
1276            module_object = getattr(server, name)
1277            if getattr(module_object, '__module__', None) == 'http.server':
1278                expected.append(name)
1279        self.assertCountEqual(server.__all__, expected)
1280
1281
1282class ScriptTestCase(unittest.TestCase):
1283
1284    def mock_server_class(self):
1285        return mock.MagicMock(
1286            return_value=mock.MagicMock(
1287                __enter__=mock.MagicMock(
1288                    return_value=mock.MagicMock(
1289                        socket=mock.MagicMock(
1290                            getsockname=lambda: ('', 0),
1291                        ),
1292                    ),
1293                ),
1294            ),
1295        )
1296
1297    @mock.patch('builtins.print')
1298    def test_server_test_unspec(self, _):
1299        mock_server = self.mock_server_class()
1300        server.test(ServerClass=mock_server, bind=None)
1301        self.assertIn(
1302            mock_server.address_family,
1303            (socket.AF_INET6, socket.AF_INET),
1304        )
1305
1306    @mock.patch('builtins.print')
1307    def test_server_test_localhost(self, _):
1308        mock_server = self.mock_server_class()
1309        server.test(ServerClass=mock_server, bind="localhost")
1310        self.assertIn(
1311            mock_server.address_family,
1312            (socket.AF_INET6, socket.AF_INET),
1313        )
1314
1315    ipv6_addrs = (
1316        "::",
1317        "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
1318        "::1",
1319    )
1320
1321    ipv4_addrs = (
1322        "0.0.0.0",
1323        "8.8.8.8",
1324        "127.0.0.1",
1325    )
1326
1327    @mock.patch('builtins.print')
1328    def test_server_test_ipv6(self, _):
1329        for bind in self.ipv6_addrs:
1330            mock_server = self.mock_server_class()
1331            server.test(ServerClass=mock_server, bind=bind)
1332            self.assertEqual(mock_server.address_family, socket.AF_INET6)
1333
1334    @mock.patch('builtins.print')
1335    def test_server_test_ipv4(self, _):
1336        for bind in self.ipv4_addrs:
1337            mock_server = self.mock_server_class()
1338            server.test(ServerClass=mock_server, bind=bind)
1339            self.assertEqual(mock_server.address_family, socket.AF_INET)
1340
1341
1342def test_main(verbose=None):
1343    cwd = os.getcwd()
1344    try:
1345        support.run_unittest(
1346            RequestHandlerLoggingTestCase,
1347            BaseHTTPRequestHandlerTestCase,
1348            BaseHTTPServerTestCase,
1349            SimpleHTTPServerTestCase,
1350            CGIHTTPServerTestCase,
1351            SimpleHTTPRequestHandlerTestCase,
1352            MiscTestCase,
1353            ScriptTestCase
1354        )
1355    finally:
1356        os.chdir(cwd)
1357
1358if __name__ == '__main__':
1359    test_main()
1360