• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Unittests for the various HTTPServer modules.
2
3Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
5"""
6from collections import OrderedDict
7from http.server import BaseHTTPRequestHandler, HTTPServer, \
8     SimpleHTTPRequestHandler, CGIHTTPRequestHandler
9from http import server, HTTPStatus
10
11import os
12import socket
13import sys
14import re
15import base64
16import ntpath
17import pathlib
18import shutil
19import email.message
20import email.utils
21import html
22import http, http.client
23import urllib.parse
24import tempfile
25import time
26import datetime
27import threading
28from unittest import mock
29from io import BytesIO
30
31import unittest
32from test import support
33from test.support import os_helper
34from test.support import threading_helper
35
36
37class NoLogRequestHandler:
38    def log_message(self, *args):
39        # don't write log messages to stderr
40        pass
41
42    def read(self, n=None):
43        return ''
44
45
46class TestServerThread(threading.Thread):
47    def __init__(self, test_object, request_handler):
48        threading.Thread.__init__(self)
49        self.request_handler = request_handler
50        self.test_object = test_object
51
52    def run(self):
53        self.server = HTTPServer(('localhost', 0), self.request_handler)
54        self.test_object.HOST, self.test_object.PORT = self.server.socket.getsockname()
55        self.test_object.server_started.set()
56        self.test_object = None
57        try:
58            self.server.serve_forever(0.05)
59        finally:
60            self.server.server_close()
61
62    def stop(self):
63        self.server.shutdown()
64        self.join()
65
66
67class BaseTestCase(unittest.TestCase):
68    def setUp(self):
69        self._threads = threading_helper.threading_setup()
70        os.environ = os_helper.EnvironmentVarGuard()
71        self.server_started = threading.Event()
72        self.thread = TestServerThread(self, self.request_handler)
73        self.thread.start()
74        self.server_started.wait()
75
76    def tearDown(self):
77        self.thread.stop()
78        self.thread = None
79        os.environ.__exit__()
80        threading_helper.threading_cleanup(*self._threads)
81
82    def request(self, uri, method='GET', body=None, headers={}):
83        self.connection = http.client.HTTPConnection(self.HOST, self.PORT)
84        self.connection.request(method, uri, body, headers)
85        return self.connection.getresponse()
86
87
88class BaseHTTPServerTestCase(BaseTestCase):
89    class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
90        protocol_version = 'HTTP/1.1'
91        default_request_version = 'HTTP/1.1'
92
93        def do_TEST(self):
94            self.send_response(HTTPStatus.NO_CONTENT)
95            self.send_header('Content-Type', 'text/html')
96            self.send_header('Connection', 'close')
97            self.end_headers()
98
99        def do_KEEP(self):
100            self.send_response(HTTPStatus.NO_CONTENT)
101            self.send_header('Content-Type', 'text/html')
102            self.send_header('Connection', 'keep-alive')
103            self.end_headers()
104
105        def do_KEYERROR(self):
106            self.send_error(999)
107
108        def do_NOTFOUND(self):
109            self.send_error(HTTPStatus.NOT_FOUND)
110
111        def do_EXPLAINERROR(self):
112            self.send_error(999, "Short Message",
113                            "This is a long \n explanation")
114
115        def do_CUSTOM(self):
116            self.send_response(999)
117            self.send_header('Content-Type', 'text/html')
118            self.send_header('Connection', 'close')
119            self.end_headers()
120
121        def do_LATINONEHEADER(self):
122            self.send_response(999)
123            self.send_header('X-Special', 'Dängerous Mind')
124            self.send_header('Connection', 'close')
125            self.end_headers()
126            body = self.headers['x-special-incoming'].encode('utf-8')
127            self.wfile.write(body)
128
129        def do_SEND_ERROR(self):
130            self.send_error(int(self.path[1:]))
131
132        def do_HEAD(self):
133            self.send_error(int(self.path[1:]))
134
135    def setUp(self):
136        BaseTestCase.setUp(self)
137        self.con = http.client.HTTPConnection(self.HOST, self.PORT)
138        self.con.connect()
139
140    def test_command(self):
141        self.con.request('GET', '/')
142        res = self.con.getresponse()
143        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
144
145    def test_request_line_trimming(self):
146        self.con._http_vsn_str = 'HTTP/1.1\n'
147        self.con.putrequest('XYZBOGUS', '/')
148        self.con.endheaders()
149        res = self.con.getresponse()
150        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
151
152    def test_version_bogus(self):
153        self.con._http_vsn_str = 'FUBAR'
154        self.con.putrequest('GET', '/')
155        self.con.endheaders()
156        res = self.con.getresponse()
157        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
158
159    def test_version_digits(self):
160        self.con._http_vsn_str = 'HTTP/9.9.9'
161        self.con.putrequest('GET', '/')
162        self.con.endheaders()
163        res = self.con.getresponse()
164        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
165
166    def test_version_none_get(self):
167        self.con._http_vsn_str = ''
168        self.con.putrequest('GET', '/')
169        self.con.endheaders()
170        res = self.con.getresponse()
171        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
172
173    def test_version_none(self):
174        # Test that a valid method is rejected when not HTTP/1.x
175        self.con._http_vsn_str = ''
176        self.con.putrequest('CUSTOM', '/')
177        self.con.endheaders()
178        res = self.con.getresponse()
179        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
180
181    def test_version_invalid(self):
182        self.con._http_vsn = 99
183        self.con._http_vsn_str = 'HTTP/9.9'
184        self.con.putrequest('GET', '/')
185        self.con.endheaders()
186        res = self.con.getresponse()
187        self.assertEqual(res.status, HTTPStatus.HTTP_VERSION_NOT_SUPPORTED)
188
189    def test_send_blank(self):
190        self.con._http_vsn_str = ''
191        self.con.putrequest('', '')
192        self.con.endheaders()
193        res = self.con.getresponse()
194        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
195
196    def test_header_close(self):
197        self.con.putrequest('GET', '/')
198        self.con.putheader('Connection', 'close')
199        self.con.endheaders()
200        res = self.con.getresponse()
201        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
202
203    def test_header_keep_alive(self):
204        self.con._http_vsn_str = 'HTTP/1.1'
205        self.con.putrequest('GET', '/')
206        self.con.putheader('Connection', 'keep-alive')
207        self.con.endheaders()
208        res = self.con.getresponse()
209        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
210
211    def test_handler(self):
212        self.con.request('TEST', '/')
213        res = self.con.getresponse()
214        self.assertEqual(res.status, HTTPStatus.NO_CONTENT)
215
216    def test_return_header_keep_alive(self):
217        self.con.request('KEEP', '/')
218        res = self.con.getresponse()
219        self.assertEqual(res.getheader('Connection'), 'keep-alive')
220        self.con.request('TEST', '/')
221        self.addCleanup(self.con.close)
222
223    def test_internal_key_error(self):
224        self.con.request('KEYERROR', '/')
225        res = self.con.getresponse()
226        self.assertEqual(res.status, 999)
227
228    def test_return_custom_status(self):
229        self.con.request('CUSTOM', '/')
230        res = self.con.getresponse()
231        self.assertEqual(res.status, 999)
232
233    def test_return_explain_error(self):
234        self.con.request('EXPLAINERROR', '/')
235        res = self.con.getresponse()
236        self.assertEqual(res.status, 999)
237        self.assertTrue(int(res.getheader('Content-Length')))
238
239    def test_latin1_header(self):
240        self.con.request('LATINONEHEADER', '/', headers={
241            'X-Special-Incoming':       'Ärger mit Unicode'
242        })
243        res = self.con.getresponse()
244        self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind')
245        self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8'))
246
247    def test_error_content_length(self):
248        # Issue #16088: standard error responses should have a content-length
249        self.con.request('NOTFOUND', '/')
250        res = self.con.getresponse()
251        self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
252
253        data = res.read()
254        self.assertEqual(int(res.getheader('Content-Length')), len(data))
255
256    def test_send_error(self):
257        allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED,
258                                         HTTPStatus.RESET_CONTENT)
259        for code in (HTTPStatus.NO_CONTENT, HTTPStatus.NOT_MODIFIED,
260                     HTTPStatus.PROCESSING, HTTPStatus.RESET_CONTENT,
261                     HTTPStatus.SWITCHING_PROTOCOLS):
262            self.con.request('SEND_ERROR', '/{}'.format(code))
263            res = self.con.getresponse()
264            self.assertEqual(code, res.status)
265            self.assertEqual(None, res.getheader('Content-Length'))
266            self.assertEqual(None, res.getheader('Content-Type'))
267            if code not in allow_transfer_encoding_codes:
268                self.assertEqual(None, res.getheader('Transfer-Encoding'))
269
270            data = res.read()
271            self.assertEqual(b'', data)
272
273    def test_head_via_send_error(self):
274        allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED,
275                                         HTTPStatus.RESET_CONTENT)
276        for code in (HTTPStatus.OK, HTTPStatus.NO_CONTENT,
277                     HTTPStatus.NOT_MODIFIED, HTTPStatus.RESET_CONTENT,
278                     HTTPStatus.SWITCHING_PROTOCOLS):
279            self.con.request('HEAD', '/{}'.format(code))
280            res = self.con.getresponse()
281            self.assertEqual(code, res.status)
282            if code == HTTPStatus.OK:
283                self.assertTrue(int(res.getheader('Content-Length')) > 0)
284                self.assertIn('text/html', res.getheader('Content-Type'))
285            else:
286                self.assertEqual(None, res.getheader('Content-Length'))
287                self.assertEqual(None, res.getheader('Content-Type'))
288            if code not in allow_transfer_encoding_codes:
289                self.assertEqual(None, res.getheader('Transfer-Encoding'))
290
291            data = res.read()
292            self.assertEqual(b'', data)
293
294
295class RequestHandlerLoggingTestCase(BaseTestCase):
296    class request_handler(BaseHTTPRequestHandler):
297        protocol_version = 'HTTP/1.1'
298        default_request_version = 'HTTP/1.1'
299
300        def do_GET(self):
301            self.send_response(HTTPStatus.OK)
302            self.end_headers()
303
304        def do_ERROR(self):
305            self.send_error(HTTPStatus.NOT_FOUND, 'File not found')
306
307    def test_get(self):
308        self.con = http.client.HTTPConnection(self.HOST, self.PORT)
309        self.con.connect()
310
311        with support.captured_stderr() as err:
312            self.con.request('GET', '/')
313            self.con.getresponse()
314
315        self.assertTrue(
316            err.getvalue().endswith('"GET / HTTP/1.1" 200 -\n'))
317
318    def test_err(self):
319        self.con = http.client.HTTPConnection(self.HOST, self.PORT)
320        self.con.connect()
321
322        with support.captured_stderr() as err:
323            self.con.request('ERROR', '/')
324            self.con.getresponse()
325
326        lines = err.getvalue().split('\n')
327        self.assertTrue(lines[0].endswith('code 404, message File not found'))
328        self.assertTrue(lines[1].endswith('"ERROR / HTTP/1.1" 404 -'))
329
330
331class SimpleHTTPServerTestCase(BaseTestCase):
332    class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
333        pass
334
335    def setUp(self):
336        super().setUp()
337        self.cwd = os.getcwd()
338        basetempdir = tempfile.gettempdir()
339        os.chdir(basetempdir)
340        self.data = b'We are the knights who say Ni!'
341        self.tempdir = tempfile.mkdtemp(dir=basetempdir)
342        self.tempdir_name = os.path.basename(self.tempdir)
343        self.base_url = '/' + self.tempdir_name
344        tempname = os.path.join(self.tempdir, 'test')
345        with open(tempname, 'wb') as temp:
346            temp.write(self.data)
347            temp.flush()
348        mtime = os.stat(tempname).st_mtime
349        # compute last modification datetime for browser cache tests
350        last_modif = datetime.datetime.fromtimestamp(mtime,
351            datetime.timezone.utc)
352        self.last_modif_datetime = last_modif.replace(microsecond=0)
353        self.last_modif_header = email.utils.formatdate(
354            last_modif.timestamp(), usegmt=True)
355
356    def tearDown(self):
357        try:
358            os.chdir(self.cwd)
359            try:
360                shutil.rmtree(self.tempdir)
361            except:
362                pass
363        finally:
364            super().tearDown()
365
366    def check_status_and_reason(self, response, status, data=None):
367        def close_conn():
368            """Don't close reader yet so we can check if there was leftover
369            buffered input"""
370            nonlocal reader
371            reader = response.fp
372            response.fp = None
373        reader = None
374        response._close_conn = close_conn
375
376        body = response.read()
377        self.assertTrue(response)
378        self.assertEqual(response.status, status)
379        self.assertIsNotNone(response.reason)
380        if data:
381            self.assertEqual(data, body)
382        # Ensure the server has not set up a persistent connection, and has
383        # not sent any extra data
384        self.assertEqual(response.version, 10)
385        self.assertEqual(response.msg.get("Connection", "close"), "close")
386        self.assertEqual(reader.read(30), b'', 'Connection should be closed')
387
388        reader.close()
389        return body
390
391    @unittest.skipIf(sys.platform == 'darwin',
392                     'undecodable name cannot always be decoded on macOS')
393    @unittest.skipIf(sys.platform == 'win32',
394                     'undecodable name cannot be decoded on win32')
395    @unittest.skipUnless(os_helper.TESTFN_UNDECODABLE,
396                         'need os_helper.TESTFN_UNDECODABLE')
397    def test_undecodable_filename(self):
398        enc = sys.getfilesystemencoding()
399        filename = os.fsdecode(os_helper.TESTFN_UNDECODABLE) + '.txt'
400        with open(os.path.join(self.tempdir, filename), 'wb') as f:
401            f.write(os_helper.TESTFN_UNDECODABLE)
402        response = self.request(self.base_url + '/')
403        if sys.platform == 'darwin':
404            # On Mac OS the HFS+ filesystem replaces bytes that aren't valid
405            # UTF-8 into a percent-encoded value.
406            for name in os.listdir(self.tempdir):
407                if name != 'test': # Ignore a filename created in setUp().
408                    filename = name
409                    break
410        body = self.check_status_and_reason(response, HTTPStatus.OK)
411        quotedname = urllib.parse.quote(filename, errors='surrogatepass')
412        self.assertIn(('href="%s"' % quotedname)
413                      .encode(enc, 'surrogateescape'), body)
414        self.assertIn(('>%s<' % html.escape(filename, quote=False))
415                      .encode(enc, 'surrogateescape'), body)
416        response = self.request(self.base_url + '/' + quotedname)
417        self.check_status_and_reason(response, HTTPStatus.OK,
418                                     data=os_helper.TESTFN_UNDECODABLE)
419
420    def test_get_dir_redirect_location_domain_injection_bug(self):
421        """Ensure //evil.co/..%2f../../X does not put //evil.co/ in Location.
422
423        //netloc/ in a Location header is a redirect to a new host.
424        https://github.com/python/cpython/issues/87389
425
426        This checks that a path resolving to a directory on our server cannot
427        resolve into a redirect to another server.
428        """
429        os.mkdir(os.path.join(self.tempdir, 'existing_directory'))
430        url = f'/python.org/..%2f..%2f..%2f..%2f..%2f../%0a%0d/../{self.tempdir_name}/existing_directory'
431        expected_location = f'{url}/'  # /python.org.../ single slash single prefix, trailing slash
432        # Canonicalizes to /tmp/tempdir_name/existing_directory which does
433        # exist and is a dir, triggering the 301 redirect logic.
434        response = self.request(url)
435        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
436        location = response.getheader('Location')
437        self.assertEqual(location, expected_location, msg='non-attack failed!')
438
439        # //python.org... multi-slash prefix, no trailing slash
440        attack_url = f'/{url}'
441        response = self.request(attack_url)
442        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
443        location = response.getheader('Location')
444        self.assertFalse(location.startswith('//'), msg=location)
445        self.assertEqual(location, expected_location,
446                msg='Expected Location header to start with a single / and '
447                'end with a / as this is a directory redirect.')
448
449        # ///python.org... triple-slash prefix, no trailing slash
450        attack3_url = f'//{url}'
451        response = self.request(attack3_url)
452        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
453        self.assertEqual(response.getheader('Location'), expected_location)
454
455        # If the second word in the http request (Request-URI for the http
456        # method) is a full URI, we don't worry about it, as that'll be parsed
457        # and reassembled as a full URI within BaseHTTPRequestHandler.send_head
458        # so no errant scheme-less //netloc//evil.co/ domain mixup can happen.
459        attack_scheme_netloc_2slash_url = f'https://pypi.org/{url}'
460        expected_scheme_netloc_location = f'{attack_scheme_netloc_2slash_url}/'
461        response = self.request(attack_scheme_netloc_2slash_url)
462        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
463        location = response.getheader('Location')
464        # We're just ensuring that the scheme and domain make it through, if
465        # there are or aren't multiple slashes at the start of the path that
466        # follows that isn't important in this Location: header.
467        self.assertTrue(location.startswith('https://pypi.org/'), msg=location)
468
469    def test_get(self):
470        #constructs the path relative to the root directory of the HTTPServer
471        response = self.request(self.base_url + '/test')
472        self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
473        # check for trailing "/" which should return 404. See Issue17324
474        response = self.request(self.base_url + '/test/')
475        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
476        response = self.request(self.base_url + '/')
477        self.check_status_and_reason(response, HTTPStatus.OK)
478        response = self.request(self.base_url)
479        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
480        self.assertEqual(response.getheader("Content-Length"), "0")
481        response = self.request(self.base_url + '/?hi=2')
482        self.check_status_and_reason(response, HTTPStatus.OK)
483        response = self.request(self.base_url + '?hi=1')
484        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
485        self.assertEqual(response.getheader("Location"),
486                         self.base_url + "/?hi=1")
487        response = self.request('/ThisDoesNotExist')
488        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
489        response = self.request('/' + 'ThisDoesNotExist' + '/')
490        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
491
492        data = b"Dummy index file\r\n"
493        with open(os.path.join(self.tempdir_name, 'index.html'), 'wb') as f:
494            f.write(data)
495        response = self.request(self.base_url + '/')
496        self.check_status_and_reason(response, HTTPStatus.OK, data)
497
498        # chmod() doesn't work as expected on Windows, and filesystem
499        # permissions are ignored by root on Unix.
500        if os.name == 'posix' and os.geteuid() != 0:
501            os.chmod(self.tempdir, 0)
502            try:
503                response = self.request(self.base_url + '/')
504                self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
505            finally:
506                os.chmod(self.tempdir, 0o755)
507
508    def test_head(self):
509        response = self.request(
510            self.base_url + '/test', method='HEAD')
511        self.check_status_and_reason(response, HTTPStatus.OK)
512        self.assertEqual(response.getheader('content-length'),
513                         str(len(self.data)))
514        self.assertEqual(response.getheader('content-type'),
515                         'application/octet-stream')
516
517    def test_browser_cache(self):
518        """Check that when a request to /test is sent with the request header
519        If-Modified-Since set to date of last modification, the server returns
520        status code 304, not 200
521        """
522        headers = email.message.Message()
523        headers['If-Modified-Since'] = self.last_modif_header
524        response = self.request(self.base_url + '/test', headers=headers)
525        self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED)
526
527        # one hour after last modification : must return 304
528        new_dt = self.last_modif_datetime + datetime.timedelta(hours=1)
529        headers = email.message.Message()
530        headers['If-Modified-Since'] = email.utils.format_datetime(new_dt,
531            usegmt=True)
532        response = self.request(self.base_url + '/test', headers=headers)
533        self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED)
534
535    def test_browser_cache_file_changed(self):
536        # with If-Modified-Since earlier than Last-Modified, must return 200
537        dt = self.last_modif_datetime
538        # build datetime object : 365 days before last modification
539        old_dt = dt - datetime.timedelta(days=365)
540        headers = email.message.Message()
541        headers['If-Modified-Since'] = email.utils.format_datetime(old_dt,
542            usegmt=True)
543        response = self.request(self.base_url + '/test', headers=headers)
544        self.check_status_and_reason(response, HTTPStatus.OK)
545
546    def test_browser_cache_with_If_None_Match_header(self):
547        # if If-None-Match header is present, ignore If-Modified-Since
548
549        headers = email.message.Message()
550        headers['If-Modified-Since'] = self.last_modif_header
551        headers['If-None-Match'] = "*"
552        response = self.request(self.base_url + '/test', headers=headers)
553        self.check_status_and_reason(response, HTTPStatus.OK)
554
555    def test_invalid_requests(self):
556        response = self.request('/', method='FOO')
557        self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
558        # requests must be case sensitive,so this should fail too
559        response = self.request('/', method='custom')
560        self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
561        response = self.request('/', method='GETs')
562        self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
563
564    def test_last_modified(self):
565        """Checks that the datetime returned in Last-Modified response header
566        is the actual datetime of last modification, rounded to the second
567        """
568        response = self.request(self.base_url + '/test')
569        self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
570        last_modif_header = response.headers['Last-modified']
571        self.assertEqual(last_modif_header, self.last_modif_header)
572
573    def test_path_without_leading_slash(self):
574        response = self.request(self.tempdir_name + '/test')
575        self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
576        response = self.request(self.tempdir_name + '/test/')
577        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
578        response = self.request(self.tempdir_name + '/')
579        self.check_status_and_reason(response, HTTPStatus.OK)
580        response = self.request(self.tempdir_name)
581        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
582        response = self.request(self.tempdir_name + '/?hi=2')
583        self.check_status_and_reason(response, HTTPStatus.OK)
584        response = self.request(self.tempdir_name + '?hi=1')
585        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
586        self.assertEqual(response.getheader("Location"),
587                         self.tempdir_name + "/?hi=1")
588
589    def test_html_escape_filename(self):
590        filename = '<test&>.txt'
591        fullpath = os.path.join(self.tempdir, filename)
592
593        try:
594            open(fullpath, 'wb').close()
595        except OSError:
596            raise unittest.SkipTest('Can not create file %s on current file '
597                                    'system' % filename)
598
599        try:
600            response = self.request(self.base_url + '/')
601            body = self.check_status_and_reason(response, HTTPStatus.OK)
602            enc = response.headers.get_content_charset()
603        finally:
604            os.unlink(fullpath)  # avoid affecting test_undecodable_filename
605
606        self.assertIsNotNone(enc)
607        html_text = '>%s<' % html.escape(filename, quote=False)
608        self.assertIn(html_text.encode(enc), body)
609
610
611cgi_file1 = """\
612#!%s
613
614print("Content-type: text/html")
615print()
616print("Hello World")
617"""
618
619cgi_file2 = """\
620#!%s
621import cgi
622
623print("Content-type: text/html")
624print()
625
626form = cgi.FieldStorage()
627print("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),
628                          form.getfirst("bacon")))
629"""
630
631cgi_file4 = """\
632#!%s
633import os
634
635print("Content-type: text/html")
636print()
637
638print(os.environ["%s"])
639"""
640
641cgi_file6 = """\
642#!%s
643import os
644
645print("X-ambv: was here")
646print("Content-type: text/html")
647print()
648print("<pre>")
649for k, v in os.environ.items():
650    try:
651        k.encode('ascii')
652        v.encode('ascii')
653    except UnicodeEncodeError:
654        continue  # see: BPO-44647
655    print(f"{k}={v}")
656print("</pre>")
657"""
658
659
660@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
661        "This test can't be run reliably as root (issue #13308).")
662class CGIHTTPServerTestCase(BaseTestCase):
663    class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
664        pass
665
666    linesep = os.linesep.encode('ascii')
667
668    def setUp(self):
669        BaseTestCase.setUp(self)
670        self.cwd = os.getcwd()
671        self.parent_dir = tempfile.mkdtemp()
672        self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
673        self.cgi_child_dir = os.path.join(self.cgi_dir, 'child-dir')
674        self.sub_dir_1 = os.path.join(self.parent_dir, 'sub')
675        self.sub_dir_2 = os.path.join(self.sub_dir_1, 'dir')
676        self.cgi_dir_in_sub_dir = os.path.join(self.sub_dir_2, 'cgi-bin')
677        os.mkdir(self.cgi_dir)
678        os.mkdir(self.cgi_child_dir)
679        os.mkdir(self.sub_dir_1)
680        os.mkdir(self.sub_dir_2)
681        os.mkdir(self.cgi_dir_in_sub_dir)
682        self.nocgi_path = None
683        self.file1_path = None
684        self.file2_path = None
685        self.file3_path = None
686        self.file4_path = None
687        self.file5_path = None
688
689        # The shebang line should be pure ASCII: use symlink if possible.
690        # See issue #7668.
691        self._pythonexe_symlink = None
692        if os_helper.can_symlink():
693            self.pythonexe = os.path.join(self.parent_dir, 'python')
694            self._pythonexe_symlink = support.PythonSymlink(self.pythonexe).__enter__()
695        else:
696            self.pythonexe = sys.executable
697
698        try:
699            # The python executable path is written as the first line of the
700            # CGI Python script. The encoding cookie cannot be used, and so the
701            # path should be encodable to the default script encoding (utf-8)
702            self.pythonexe.encode('utf-8')
703        except UnicodeEncodeError:
704            self.tearDown()
705            self.skipTest("Python executable path is not encodable to utf-8")
706
707        self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py')
708        with open(self.nocgi_path, 'w', encoding='utf-8') as fp:
709            fp.write(cgi_file1 % self.pythonexe)
710        os.chmod(self.nocgi_path, 0o777)
711
712        self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
713        with open(self.file1_path, 'w', encoding='utf-8') as file1:
714            file1.write(cgi_file1 % self.pythonexe)
715        os.chmod(self.file1_path, 0o777)
716
717        self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
718        with open(self.file2_path, 'w', encoding='utf-8') as file2:
719            file2.write(cgi_file2 % self.pythonexe)
720        os.chmod(self.file2_path, 0o777)
721
722        self.file3_path = os.path.join(self.cgi_child_dir, 'file3.py')
723        with open(self.file3_path, 'w', encoding='utf-8') as file3:
724            file3.write(cgi_file1 % self.pythonexe)
725        os.chmod(self.file3_path, 0o777)
726
727        self.file4_path = os.path.join(self.cgi_dir, 'file4.py')
728        with open(self.file4_path, 'w', encoding='utf-8') as file4:
729            file4.write(cgi_file4 % (self.pythonexe, 'QUERY_STRING'))
730        os.chmod(self.file4_path, 0o777)
731
732        self.file5_path = os.path.join(self.cgi_dir_in_sub_dir, 'file5.py')
733        with open(self.file5_path, 'w', encoding='utf-8') as file5:
734            file5.write(cgi_file1 % self.pythonexe)
735        os.chmod(self.file5_path, 0o777)
736
737        self.file6_path = os.path.join(self.cgi_dir, 'file6.py')
738        with open(self.file6_path, 'w', encoding='utf-8') as file6:
739            file6.write(cgi_file6 % self.pythonexe)
740        os.chmod(self.file6_path, 0o777)
741
742        os.chdir(self.parent_dir)
743
744    def tearDown(self):
745        try:
746            os.chdir(self.cwd)
747            if self._pythonexe_symlink:
748                self._pythonexe_symlink.__exit__(None, None, None)
749            if self.nocgi_path:
750                os.remove(self.nocgi_path)
751            if self.file1_path:
752                os.remove(self.file1_path)
753            if self.file2_path:
754                os.remove(self.file2_path)
755            if self.file3_path:
756                os.remove(self.file3_path)
757            if self.file4_path:
758                os.remove(self.file4_path)
759            if self.file5_path:
760                os.remove(self.file5_path)
761            if self.file6_path:
762                os.remove(self.file6_path)
763            os.rmdir(self.cgi_child_dir)
764            os.rmdir(self.cgi_dir)
765            os.rmdir(self.cgi_dir_in_sub_dir)
766            os.rmdir(self.sub_dir_2)
767            os.rmdir(self.sub_dir_1)
768            os.rmdir(self.parent_dir)
769        finally:
770            BaseTestCase.tearDown(self)
771
772    def test_url_collapse_path(self):
773        # verify tail is the last portion and head is the rest on proper urls
774        test_vectors = {
775            '': '//',
776            '..': IndexError,
777            '/.//..': IndexError,
778            '/': '//',
779            '//': '//',
780            '/\\': '//\\',
781            '/.//': '//',
782            'cgi-bin/file1.py': '/cgi-bin/file1.py',
783            '/cgi-bin/file1.py': '/cgi-bin/file1.py',
784            'a': '//a',
785            '/a': '//a',
786            '//a': '//a',
787            './a': '//a',
788            './C:/': '/C:/',
789            '/a/b': '/a/b',
790            '/a/b/': '/a/b/',
791            '/a/b/.': '/a/b/',
792            '/a/b/c/..': '/a/b/',
793            '/a/b/c/../d': '/a/b/d',
794            '/a/b/c/../d/e/../f': '/a/b/d/f',
795            '/a/b/c/../d/e/../../f': '/a/b/f',
796            '/a/b/c/../d/e/.././././..//f': '/a/b/f',
797            '../a/b/c/../d/e/.././././..//f': IndexError,
798            '/a/b/c/../d/e/../../../f': '/a/f',
799            '/a/b/c/../d/e/../../../../f': '//f',
800            '/a/b/c/../d/e/../../../../../f': IndexError,
801            '/a/b/c/../d/e/../../../../f/..': '//',
802            '/a/b/c/../d/e/../../../../f/../.': '//',
803        }
804        for path, expected in test_vectors.items():
805            if isinstance(expected, type) and issubclass(expected, Exception):
806                self.assertRaises(expected,
807                                  server._url_collapse_path, path)
808            else:
809                actual = server._url_collapse_path(path)
810                self.assertEqual(expected, actual,
811                                 msg='path = %r\nGot:    %r\nWanted: %r' %
812                                 (path, actual, expected))
813
814    def test_headers_and_content(self):
815        res = self.request('/cgi-bin/file1.py')
816        self.assertEqual(
817            (res.read(), res.getheader('Content-type'), res.status),
818            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK))
819
820    def test_issue19435(self):
821        res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh')
822        self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
823
824    def test_post(self):
825        params = urllib.parse.urlencode(
826            {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
827        headers = {'Content-type' : 'application/x-www-form-urlencoded'}
828        res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
829
830        self.assertEqual(res.read(), b'1, python, 123456' + self.linesep)
831
832    def test_invaliduri(self):
833        res = self.request('/cgi-bin/invalid')
834        res.read()
835        self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
836
837    def test_authorization(self):
838        headers = {b'Authorization' : b'Basic ' +
839                   base64.b64encode(b'username:pass')}
840        res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
841        self.assertEqual(
842            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
843            (res.read(), res.getheader('Content-type'), res.status))
844
845    def test_no_leading_slash(self):
846        # http://bugs.python.org/issue2254
847        res = self.request('cgi-bin/file1.py')
848        self.assertEqual(
849            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
850            (res.read(), res.getheader('Content-type'), res.status))
851
852    def test_os_environ_is_not_altered(self):
853        signature = "Test CGI Server"
854        os.environ['SERVER_SOFTWARE'] = signature
855        res = self.request('/cgi-bin/file1.py')
856        self.assertEqual(
857            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
858            (res.read(), res.getheader('Content-type'), res.status))
859        self.assertEqual(os.environ['SERVER_SOFTWARE'], signature)
860
861    def test_urlquote_decoding_in_cgi_check(self):
862        res = self.request('/cgi-bin%2ffile1.py')
863        self.assertEqual(
864            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
865            (res.read(), res.getheader('Content-type'), res.status))
866
867    def test_nested_cgi_path_issue21323(self):
868        res = self.request('/cgi-bin/child-dir/file3.py')
869        self.assertEqual(
870            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
871            (res.read(), res.getheader('Content-type'), res.status))
872
873    def test_query_with_multiple_question_mark(self):
874        res = self.request('/cgi-bin/file4.py?a=b?c=d')
875        self.assertEqual(
876            (b'a=b?c=d' + self.linesep, 'text/html', HTTPStatus.OK),
877            (res.read(), res.getheader('Content-type'), res.status))
878
879    def test_query_with_continuous_slashes(self):
880        res = self.request('/cgi-bin/file4.py?k=aa%2F%2Fbb&//q//p//=//a//b//')
881        self.assertEqual(
882            (b'k=aa%2F%2Fbb&//q//p//=//a//b//' + self.linesep,
883             'text/html', HTTPStatus.OK),
884            (res.read(), res.getheader('Content-type'), res.status))
885
886    def test_cgi_path_in_sub_directories(self):
887        try:
888            CGIHTTPRequestHandler.cgi_directories.append('/sub/dir/cgi-bin')
889            res = self.request('/sub/dir/cgi-bin/file5.py')
890            self.assertEqual(
891                (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
892                (res.read(), res.getheader('Content-type'), res.status))
893        finally:
894            CGIHTTPRequestHandler.cgi_directories.remove('/sub/dir/cgi-bin')
895
896    def test_accept(self):
897        browser_accept = \
898                    'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8'
899        tests = (
900            ((('Accept', browser_accept),), browser_accept),
901            ((), ''),
902            # Hack case to get two values for the one header
903            ((('Accept', 'text/html'), ('ACCEPT', 'text/plain')),
904               'text/html,text/plain'),
905        )
906        for headers, expected in tests:
907            headers = OrderedDict(headers)
908            with self.subTest(headers):
909                res = self.request('/cgi-bin/file6.py', 'GET', headers=headers)
910                self.assertEqual(http.HTTPStatus.OK, res.status)
911                expected = f"HTTP_ACCEPT={expected}".encode('ascii')
912                self.assertIn(expected, res.read())
913
914
915class SocketlessRequestHandler(SimpleHTTPRequestHandler):
916    def __init__(self, directory=None):
917        request = mock.Mock()
918        request.makefile.return_value = BytesIO()
919        super().__init__(request, None, None, directory=directory)
920
921        self.get_called = False
922        self.protocol_version = "HTTP/1.1"
923
924    def do_GET(self):
925        self.get_called = True
926        self.send_response(HTTPStatus.OK)
927        self.send_header('Content-Type', 'text/html')
928        self.end_headers()
929        self.wfile.write(b'<html><body>Data</body></html>\r\n')
930
931    def log_message(self, format, *args):
932        pass
933
934class RejectingSocketlessRequestHandler(SocketlessRequestHandler):
935    def handle_expect_100(self):
936        self.send_error(HTTPStatus.EXPECTATION_FAILED)
937        return False
938
939
940class AuditableBytesIO:
941
942    def __init__(self):
943        self.datas = []
944
945    def write(self, data):
946        self.datas.append(data)
947
948    def getData(self):
949        return b''.join(self.datas)
950
951    @property
952    def numWrites(self):
953        return len(self.datas)
954
955
956class BaseHTTPRequestHandlerTestCase(unittest.TestCase):
957    """Test the functionality of the BaseHTTPServer.
958
959       Test the support for the Expect 100-continue header.
960       """
961
962    HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK')
963
964    def setUp (self):
965        self.handler = SocketlessRequestHandler()
966
967    def send_typical_request(self, message):
968        input = BytesIO(message)
969        output = BytesIO()
970        self.handler.rfile = input
971        self.handler.wfile = output
972        self.handler.handle_one_request()
973        output.seek(0)
974        return output.readlines()
975
976    def verify_get_called(self):
977        self.assertTrue(self.handler.get_called)
978
979    def verify_expected_headers(self, headers):
980        for fieldName in b'Server: ', b'Date: ', b'Content-Type: ':
981            self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1)
982
983    def verify_http_server_response(self, response):
984        match = self.HTTPResponseMatch.search(response)
985        self.assertIsNotNone(match)
986
987    def test_http_1_1(self):
988        result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n')
989        self.verify_http_server_response(result[0])
990        self.verify_expected_headers(result[1:-1])
991        self.verify_get_called()
992        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
993        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
994        self.assertEqual(self.handler.command, 'GET')
995        self.assertEqual(self.handler.path, '/')
996        self.assertEqual(self.handler.request_version, 'HTTP/1.1')
997        self.assertSequenceEqual(self.handler.headers.items(), ())
998
999    def test_http_1_0(self):
1000        result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n')
1001        self.verify_http_server_response(result[0])
1002        self.verify_expected_headers(result[1:-1])
1003        self.verify_get_called()
1004        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
1005        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0')
1006        self.assertEqual(self.handler.command, 'GET')
1007        self.assertEqual(self.handler.path, '/')
1008        self.assertEqual(self.handler.request_version, 'HTTP/1.0')
1009        self.assertSequenceEqual(self.handler.headers.items(), ())
1010
1011    def test_http_0_9(self):
1012        result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n')
1013        self.assertEqual(len(result), 1)
1014        self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n')
1015        self.verify_get_called()
1016
1017    def test_extra_space(self):
1018        result = self.send_typical_request(
1019            b'GET /spaced out HTTP/1.1\r\n'
1020            b'Host: dummy\r\n'
1021            b'\r\n'
1022        )
1023        self.assertTrue(result[0].startswith(b'HTTP/1.1 400 '))
1024        self.verify_expected_headers(result[1:result.index(b'\r\n')])
1025        self.assertFalse(self.handler.get_called)
1026
1027    def test_with_continue_1_0(self):
1028        result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n')
1029        self.verify_http_server_response(result[0])
1030        self.verify_expected_headers(result[1:-1])
1031        self.verify_get_called()
1032        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
1033        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0')
1034        self.assertEqual(self.handler.command, 'GET')
1035        self.assertEqual(self.handler.path, '/')
1036        self.assertEqual(self.handler.request_version, 'HTTP/1.0')
1037        headers = (("Expect", "100-continue"),)
1038        self.assertSequenceEqual(self.handler.headers.items(), headers)
1039
1040    def test_with_continue_1_1(self):
1041        result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
1042        self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n')
1043        self.assertEqual(result[1], b'\r\n')
1044        self.assertEqual(result[2], b'HTTP/1.1 200 OK\r\n')
1045        self.verify_expected_headers(result[2:-1])
1046        self.verify_get_called()
1047        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
1048        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
1049        self.assertEqual(self.handler.command, 'GET')
1050        self.assertEqual(self.handler.path, '/')
1051        self.assertEqual(self.handler.request_version, 'HTTP/1.1')
1052        headers = (("Expect", "100-continue"),)
1053        self.assertSequenceEqual(self.handler.headers.items(), headers)
1054
1055    def test_header_buffering_of_send_error(self):
1056
1057        input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
1058        output = AuditableBytesIO()
1059        handler = SocketlessRequestHandler()
1060        handler.rfile = input
1061        handler.wfile = output
1062        handler.request_version = 'HTTP/1.1'
1063        handler.requestline = ''
1064        handler.command = None
1065
1066        handler.send_error(418)
1067        self.assertEqual(output.numWrites, 2)
1068
1069    def test_header_buffering_of_send_response_only(self):
1070
1071        input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
1072        output = AuditableBytesIO()
1073        handler = SocketlessRequestHandler()
1074        handler.rfile = input
1075        handler.wfile = output
1076        handler.request_version = 'HTTP/1.1'
1077
1078        handler.send_response_only(418)
1079        self.assertEqual(output.numWrites, 0)
1080        handler.end_headers()
1081        self.assertEqual(output.numWrites, 1)
1082
1083    def test_header_buffering_of_send_header(self):
1084
1085        input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
1086        output = AuditableBytesIO()
1087        handler = SocketlessRequestHandler()
1088        handler.rfile = input
1089        handler.wfile = output
1090        handler.request_version = 'HTTP/1.1'
1091
1092        handler.send_header('Foo', 'foo')
1093        handler.send_header('bar', 'bar')
1094        self.assertEqual(output.numWrites, 0)
1095        handler.end_headers()
1096        self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n')
1097        self.assertEqual(output.numWrites, 1)
1098
1099    def test_header_unbuffered_when_continue(self):
1100
1101        def _readAndReseek(f):
1102            pos = f.tell()
1103            f.seek(0)
1104            data = f.read()
1105            f.seek(pos)
1106            return data
1107
1108        input = BytesIO(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
1109        output = BytesIO()
1110        self.handler.rfile = input
1111        self.handler.wfile = output
1112        self.handler.request_version = 'HTTP/1.1'
1113
1114        self.handler.handle_one_request()
1115        self.assertNotEqual(_readAndReseek(output), b'')
1116        result = _readAndReseek(output).split(b'\r\n')
1117        self.assertEqual(result[0], b'HTTP/1.1 100 Continue')
1118        self.assertEqual(result[1], b'')
1119        self.assertEqual(result[2], b'HTTP/1.1 200 OK')
1120
1121    def test_with_continue_rejected(self):
1122        usual_handler = self.handler        # Save to avoid breaking any subsequent tests.
1123        self.handler = RejectingSocketlessRequestHandler()
1124        result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
1125        self.assertEqual(result[0], b'HTTP/1.1 417 Expectation Failed\r\n')
1126        self.verify_expected_headers(result[1:-1])
1127        # The expect handler should short circuit the usual get method by
1128        # returning false here, so get_called should be false
1129        self.assertFalse(self.handler.get_called)
1130        self.assertEqual(sum(r == b'Connection: close\r\n' for r in result[1:-1]), 1)
1131        self.handler = usual_handler        # Restore to avoid breaking any subsequent tests.
1132
1133    def test_request_length(self):
1134        # Issue #10714: huge request lines are discarded, to avoid Denial
1135        # of Service attacks.
1136        result = self.send_typical_request(b'GET ' + b'x' * 65537)
1137        self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n')
1138        self.assertFalse(self.handler.get_called)
1139        self.assertIsInstance(self.handler.requestline, str)
1140
1141    def test_header_length(self):
1142        # Issue #6791: same for headers
1143        result = self.send_typical_request(
1144            b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n')
1145        self.assertEqual(result[0], b'HTTP/1.1 431 Line too long\r\n')
1146        self.assertFalse(self.handler.get_called)
1147        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
1148
1149    def test_too_many_headers(self):
1150        result = self.send_typical_request(
1151            b'GET / HTTP/1.1\r\n' + b'X-Foo: bar\r\n' * 101 + b'\r\n')
1152        self.assertEqual(result[0], b'HTTP/1.1 431 Too many headers\r\n')
1153        self.assertFalse(self.handler.get_called)
1154        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
1155
1156    def test_html_escape_on_error(self):
1157        result = self.send_typical_request(
1158            b'<script>alert("hello")</script> / HTTP/1.1')
1159        result = b''.join(result)
1160        text = '<script>alert("hello")</script>'
1161        self.assertIn(html.escape(text, quote=False).encode('ascii'), result)
1162
1163    def test_close_connection(self):
1164        # handle_one_request() should be repeatedly called until
1165        # it sets close_connection
1166        def handle_one_request():
1167            self.handler.close_connection = next(close_values)
1168        self.handler.handle_one_request = handle_one_request
1169
1170        close_values = iter((True,))
1171        self.handler.handle()
1172        self.assertRaises(StopIteration, next, close_values)
1173
1174        close_values = iter((False, False, True))
1175        self.handler.handle()
1176        self.assertRaises(StopIteration, next, close_values)
1177
1178    def test_date_time_string(self):
1179        now = time.time()
1180        # this is the old code that formats the timestamp
1181        year, month, day, hh, mm, ss, wd, y, z = time.gmtime(now)
1182        expected = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
1183            self.handler.weekdayname[wd],
1184            day,
1185            self.handler.monthname[month],
1186            year, hh, mm, ss
1187        )
1188        self.assertEqual(self.handler.date_time_string(timestamp=now), expected)
1189
1190
1191class SimpleHTTPRequestHandlerTestCase(unittest.TestCase):
1192    """ Test url parsing """
1193    def setUp(self):
1194        self.translated_1 = os.path.join(os.getcwd(), 'filename')
1195        self.translated_2 = os.path.join('foo', 'filename')
1196        self.translated_3 = os.path.join('bar', 'filename')
1197        self.handler_1 = SocketlessRequestHandler()
1198        self.handler_2 = SocketlessRequestHandler(directory='foo')
1199        self.handler_3 = SocketlessRequestHandler(directory=pathlib.PurePath('bar'))
1200
1201    def test_query_arguments(self):
1202        path = self.handler_1.translate_path('/filename')
1203        self.assertEqual(path, self.translated_1)
1204        path = self.handler_2.translate_path('/filename')
1205        self.assertEqual(path, self.translated_2)
1206        path = self.handler_3.translate_path('/filename')
1207        self.assertEqual(path, self.translated_3)
1208
1209        path = self.handler_1.translate_path('/filename?foo=bar')
1210        self.assertEqual(path, self.translated_1)
1211        path = self.handler_2.translate_path('/filename?foo=bar')
1212        self.assertEqual(path, self.translated_2)
1213        path = self.handler_3.translate_path('/filename?foo=bar')
1214        self.assertEqual(path, self.translated_3)
1215
1216        path = self.handler_1.translate_path('/filename?a=b&spam=eggs#zot')
1217        self.assertEqual(path, self.translated_1)
1218        path = self.handler_2.translate_path('/filename?a=b&spam=eggs#zot')
1219        self.assertEqual(path, self.translated_2)
1220        path = self.handler_3.translate_path('/filename?a=b&spam=eggs#zot')
1221        self.assertEqual(path, self.translated_3)
1222
1223    def test_start_with_double_slash(self):
1224        path = self.handler_1.translate_path('//filename')
1225        self.assertEqual(path, self.translated_1)
1226        path = self.handler_2.translate_path('//filename')
1227        self.assertEqual(path, self.translated_2)
1228        path = self.handler_3.translate_path('//filename')
1229        self.assertEqual(path, self.translated_3)
1230
1231        path = self.handler_1.translate_path('//filename?foo=bar')
1232        self.assertEqual(path, self.translated_1)
1233        path = self.handler_2.translate_path('//filename?foo=bar')
1234        self.assertEqual(path, self.translated_2)
1235        path = self.handler_3.translate_path('//filename?foo=bar')
1236        self.assertEqual(path, self.translated_3)
1237
1238    def test_windows_colon(self):
1239        with support.swap_attr(server.os, 'path', ntpath):
1240            path = self.handler_1.translate_path('c:c:c:foo/filename')
1241            path = path.replace(ntpath.sep, os.sep)
1242            self.assertEqual(path, self.translated_1)
1243            path = self.handler_2.translate_path('c:c:c:foo/filename')
1244            path = path.replace(ntpath.sep, os.sep)
1245            self.assertEqual(path, self.translated_2)
1246            path = self.handler_3.translate_path('c:c:c:foo/filename')
1247            path = path.replace(ntpath.sep, os.sep)
1248            self.assertEqual(path, self.translated_3)
1249
1250            path = self.handler_1.translate_path('\\c:../filename')
1251            path = path.replace(ntpath.sep, os.sep)
1252            self.assertEqual(path, self.translated_1)
1253            path = self.handler_2.translate_path('\\c:../filename')
1254            path = path.replace(ntpath.sep, os.sep)
1255            self.assertEqual(path, self.translated_2)
1256            path = self.handler_3.translate_path('\\c:../filename')
1257            path = path.replace(ntpath.sep, os.sep)
1258            self.assertEqual(path, self.translated_3)
1259
1260            path = self.handler_1.translate_path('c:\\c:..\\foo/filename')
1261            path = path.replace(ntpath.sep, os.sep)
1262            self.assertEqual(path, self.translated_1)
1263            path = self.handler_2.translate_path('c:\\c:..\\foo/filename')
1264            path = path.replace(ntpath.sep, os.sep)
1265            self.assertEqual(path, self.translated_2)
1266            path = self.handler_3.translate_path('c:\\c:..\\foo/filename')
1267            path = path.replace(ntpath.sep, os.sep)
1268            self.assertEqual(path, self.translated_3)
1269
1270            path = self.handler_1.translate_path('c:c:foo\\c:c:bar/filename')
1271            path = path.replace(ntpath.sep, os.sep)
1272            self.assertEqual(path, self.translated_1)
1273            path = self.handler_2.translate_path('c:c:foo\\c:c:bar/filename')
1274            path = path.replace(ntpath.sep, os.sep)
1275            self.assertEqual(path, self.translated_2)
1276            path = self.handler_3.translate_path('c:c:foo\\c:c:bar/filename')
1277            path = path.replace(ntpath.sep, os.sep)
1278            self.assertEqual(path, self.translated_3)
1279
1280
1281class MiscTestCase(unittest.TestCase):
1282    def test_all(self):
1283        expected = []
1284        denylist = {'executable', 'nobody_uid', 'test'}
1285        for name in dir(server):
1286            if name.startswith('_') or name in denylist:
1287                continue
1288            module_object = getattr(server, name)
1289            if getattr(module_object, '__module__', None) == 'http.server':
1290                expected.append(name)
1291        self.assertCountEqual(server.__all__, expected)
1292
1293
1294class ScriptTestCase(unittest.TestCase):
1295
1296    def mock_server_class(self):
1297        return mock.MagicMock(
1298            return_value=mock.MagicMock(
1299                __enter__=mock.MagicMock(
1300                    return_value=mock.MagicMock(
1301                        socket=mock.MagicMock(
1302                            getsockname=lambda: ('', 0),
1303                        ),
1304                    ),
1305                ),
1306            ),
1307        )
1308
1309    @mock.patch('builtins.print')
1310    def test_server_test_unspec(self, _):
1311        mock_server = self.mock_server_class()
1312        server.test(ServerClass=mock_server, bind=None)
1313        self.assertIn(
1314            mock_server.address_family,
1315            (socket.AF_INET6, socket.AF_INET),
1316        )
1317
1318    @mock.patch('builtins.print')
1319    def test_server_test_localhost(self, _):
1320        mock_server = self.mock_server_class()
1321        server.test(ServerClass=mock_server, bind="localhost")
1322        self.assertIn(
1323            mock_server.address_family,
1324            (socket.AF_INET6, socket.AF_INET),
1325        )
1326
1327    ipv6_addrs = (
1328        "::",
1329        "2001:0db8:85a3:0000:0000:8a2e:0370:7334",
1330        "::1",
1331    )
1332
1333    ipv4_addrs = (
1334        "0.0.0.0",
1335        "8.8.8.8",
1336        "127.0.0.1",
1337    )
1338
1339    @mock.patch('builtins.print')
1340    def test_server_test_ipv6(self, _):
1341        for bind in self.ipv6_addrs:
1342            mock_server = self.mock_server_class()
1343            server.test(ServerClass=mock_server, bind=bind)
1344            self.assertEqual(mock_server.address_family, socket.AF_INET6)
1345
1346    @mock.patch('builtins.print')
1347    def test_server_test_ipv4(self, _):
1348        for bind in self.ipv4_addrs:
1349            mock_server = self.mock_server_class()
1350            server.test(ServerClass=mock_server, bind=bind)
1351            self.assertEqual(mock_server.address_family, socket.AF_INET)
1352
1353
1354def setUpModule():
1355    unittest.addModuleCleanup(os.chdir, os.getcwd())
1356
1357
1358if __name__ == '__main__':
1359    unittest.main()
1360