• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Unittests for the various HTTPServer modules.
2
3Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
5"""
6
7from http.server import BaseHTTPRequestHandler, HTTPServer, \
8     SimpleHTTPRequestHandler, CGIHTTPRequestHandler
9from http import server, HTTPStatus
10
11import os
12import sys
13import re
14import base64
15import ntpath
16import shutil
17import email.message
18import email.utils
19import html
20import http.client
21import urllib.parse
22import tempfile
23import time
24import datetime
25import threading
26from unittest import mock
27from io import BytesIO
28
29import unittest
30from test import support
31
32
33class NoLogRequestHandler:
34    def log_message(self, *args):
35        # don't write log messages to stderr
36        pass
37
38    def read(self, n=None):
39        return ''
40
41
42class TestServerThread(threading.Thread):
43    def __init__(self, test_object, request_handler):
44        threading.Thread.__init__(self)
45        self.request_handler = request_handler
46        self.test_object = test_object
47
48    def run(self):
49        self.server = HTTPServer(('localhost', 0), self.request_handler)
50        self.test_object.HOST, self.test_object.PORT = self.server.socket.getsockname()
51        self.test_object.server_started.set()
52        self.test_object = None
53        try:
54            self.server.serve_forever(0.05)
55        finally:
56            self.server.server_close()
57
58    def stop(self):
59        self.server.shutdown()
60        self.join()
61
62
63class BaseTestCase(unittest.TestCase):
64    def setUp(self):
65        self._threads = support.threading_setup()
66        os.environ = support.EnvironmentVarGuard()
67        self.server_started = threading.Event()
68        self.thread = TestServerThread(self, self.request_handler)
69        self.thread.start()
70        self.server_started.wait()
71
72    def tearDown(self):
73        self.thread.stop()
74        self.thread = None
75        os.environ.__exit__()
76        support.threading_cleanup(*self._threads)
77
78    def request(self, uri, method='GET', body=None, headers={}):
79        self.connection = http.client.HTTPConnection(self.HOST, self.PORT)
80        self.connection.request(method, uri, body, headers)
81        return self.connection.getresponse()
82
83
84class BaseHTTPServerTestCase(BaseTestCase):
85    class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler):
86        protocol_version = 'HTTP/1.1'
87        default_request_version = 'HTTP/1.1'
88
89        def do_TEST(self):
90            self.send_response(HTTPStatus.NO_CONTENT)
91            self.send_header('Content-Type', 'text/html')
92            self.send_header('Connection', 'close')
93            self.end_headers()
94
95        def do_KEEP(self):
96            self.send_response(HTTPStatus.NO_CONTENT)
97            self.send_header('Content-Type', 'text/html')
98            self.send_header('Connection', 'keep-alive')
99            self.end_headers()
100
101        def do_KEYERROR(self):
102            self.send_error(999)
103
104        def do_NOTFOUND(self):
105            self.send_error(HTTPStatus.NOT_FOUND)
106
107        def do_EXPLAINERROR(self):
108            self.send_error(999, "Short Message",
109                            "This is a long \n explanation")
110
111        def do_CUSTOM(self):
112            self.send_response(999)
113            self.send_header('Content-Type', 'text/html')
114            self.send_header('Connection', 'close')
115            self.end_headers()
116
117        def do_LATINONEHEADER(self):
118            self.send_response(999)
119            self.send_header('X-Special', 'Dängerous Mind')
120            self.send_header('Connection', 'close')
121            self.end_headers()
122            body = self.headers['x-special-incoming'].encode('utf-8')
123            self.wfile.write(body)
124
125        def do_SEND_ERROR(self):
126            self.send_error(int(self.path[1:]))
127
128        def do_HEAD(self):
129            self.send_error(int(self.path[1:]))
130
131    def setUp(self):
132        BaseTestCase.setUp(self)
133        self.con = http.client.HTTPConnection(self.HOST, self.PORT)
134        self.con.connect()
135
136    def test_command(self):
137        self.con.request('GET', '/')
138        res = self.con.getresponse()
139        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
140
141    def test_request_line_trimming(self):
142        self.con._http_vsn_str = 'HTTP/1.1\n'
143        self.con.putrequest('XYZBOGUS', '/')
144        self.con.endheaders()
145        res = self.con.getresponse()
146        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
147
148    def test_version_bogus(self):
149        self.con._http_vsn_str = 'FUBAR'
150        self.con.putrequest('GET', '/')
151        self.con.endheaders()
152        res = self.con.getresponse()
153        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
154
155    def test_version_digits(self):
156        self.con._http_vsn_str = 'HTTP/9.9.9'
157        self.con.putrequest('GET', '/')
158        self.con.endheaders()
159        res = self.con.getresponse()
160        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
161
162    def test_version_none_get(self):
163        self.con._http_vsn_str = ''
164        self.con.putrequest('GET', '/')
165        self.con.endheaders()
166        res = self.con.getresponse()
167        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
168
169    def test_version_none(self):
170        # Test that a valid method is rejected when not HTTP/1.x
171        self.con._http_vsn_str = ''
172        self.con.putrequest('CUSTOM', '/')
173        self.con.endheaders()
174        res = self.con.getresponse()
175        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
176
177    def test_version_invalid(self):
178        self.con._http_vsn = 99
179        self.con._http_vsn_str = 'HTTP/9.9'
180        self.con.putrequest('GET', '/')
181        self.con.endheaders()
182        res = self.con.getresponse()
183        self.assertEqual(res.status, HTTPStatus.HTTP_VERSION_NOT_SUPPORTED)
184
185    def test_send_blank(self):
186        self.con._http_vsn_str = ''
187        self.con.putrequest('', '')
188        self.con.endheaders()
189        res = self.con.getresponse()
190        self.assertEqual(res.status, HTTPStatus.BAD_REQUEST)
191
192    def test_header_close(self):
193        self.con.putrequest('GET', '/')
194        self.con.putheader('Connection', 'close')
195        self.con.endheaders()
196        res = self.con.getresponse()
197        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
198
199    def test_header_keep_alive(self):
200        self.con._http_vsn_str = 'HTTP/1.1'
201        self.con.putrequest('GET', '/')
202        self.con.putheader('Connection', 'keep-alive')
203        self.con.endheaders()
204        res = self.con.getresponse()
205        self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED)
206
207    def test_handler(self):
208        self.con.request('TEST', '/')
209        res = self.con.getresponse()
210        self.assertEqual(res.status, HTTPStatus.NO_CONTENT)
211
212    def test_return_header_keep_alive(self):
213        self.con.request('KEEP', '/')
214        res = self.con.getresponse()
215        self.assertEqual(res.getheader('Connection'), 'keep-alive')
216        self.con.request('TEST', '/')
217        self.addCleanup(self.con.close)
218
219    def test_internal_key_error(self):
220        self.con.request('KEYERROR', '/')
221        res = self.con.getresponse()
222        self.assertEqual(res.status, 999)
223
224    def test_return_custom_status(self):
225        self.con.request('CUSTOM', '/')
226        res = self.con.getresponse()
227        self.assertEqual(res.status, 999)
228
229    def test_return_explain_error(self):
230        self.con.request('EXPLAINERROR', '/')
231        res = self.con.getresponse()
232        self.assertEqual(res.status, 999)
233        self.assertTrue(int(res.getheader('Content-Length')))
234
235    def test_latin1_header(self):
236        self.con.request('LATINONEHEADER', '/', headers={
237            'X-Special-Incoming':       'Ärger mit Unicode'
238        })
239        res = self.con.getresponse()
240        self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind')
241        self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8'))
242
243    def test_error_content_length(self):
244        # Issue #16088: standard error responses should have a content-length
245        self.con.request('NOTFOUND', '/')
246        res = self.con.getresponse()
247        self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
248
249        data = res.read()
250        self.assertEqual(int(res.getheader('Content-Length')), len(data))
251
252    def test_send_error(self):
253        allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED,
254                                         HTTPStatus.RESET_CONTENT)
255        for code in (HTTPStatus.NO_CONTENT, HTTPStatus.NOT_MODIFIED,
256                     HTTPStatus.PROCESSING, HTTPStatus.RESET_CONTENT,
257                     HTTPStatus.SWITCHING_PROTOCOLS):
258            self.con.request('SEND_ERROR', '/{}'.format(code))
259            res = self.con.getresponse()
260            self.assertEqual(code, res.status)
261            self.assertEqual(None, res.getheader('Content-Length'))
262            self.assertEqual(None, res.getheader('Content-Type'))
263            if code not in allow_transfer_encoding_codes:
264                self.assertEqual(None, res.getheader('Transfer-Encoding'))
265
266            data = res.read()
267            self.assertEqual(b'', data)
268
269    def test_head_via_send_error(self):
270        allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED,
271                                         HTTPStatus.RESET_CONTENT)
272        for code in (HTTPStatus.OK, HTTPStatus.NO_CONTENT,
273                     HTTPStatus.NOT_MODIFIED, HTTPStatus.RESET_CONTENT,
274                     HTTPStatus.SWITCHING_PROTOCOLS):
275            self.con.request('HEAD', '/{}'.format(code))
276            res = self.con.getresponse()
277            self.assertEqual(code, res.status)
278            if code == HTTPStatus.OK:
279                self.assertTrue(int(res.getheader('Content-Length')) > 0)
280                self.assertIn('text/html', res.getheader('Content-Type'))
281            else:
282                self.assertEqual(None, res.getheader('Content-Length'))
283                self.assertEqual(None, res.getheader('Content-Type'))
284            if code not in allow_transfer_encoding_codes:
285                self.assertEqual(None, res.getheader('Transfer-Encoding'))
286
287            data = res.read()
288            self.assertEqual(b'', data)
289
290
291class RequestHandlerLoggingTestCase(BaseTestCase):
292    class request_handler(BaseHTTPRequestHandler):
293        protocol_version = 'HTTP/1.1'
294        default_request_version = 'HTTP/1.1'
295
296        def do_GET(self):
297            self.send_response(HTTPStatus.OK)
298            self.end_headers()
299
300        def do_ERROR(self):
301            self.send_error(HTTPStatus.NOT_FOUND, 'File not found')
302
303    def test_get(self):
304        self.con = http.client.HTTPConnection(self.HOST, self.PORT)
305        self.con.connect()
306
307        with support.captured_stderr() as err:
308            self.con.request('GET', '/')
309            self.con.getresponse()
310
311        self.assertTrue(
312            err.getvalue().endswith('"GET / HTTP/1.1" 200 -\n'))
313
314    def test_err(self):
315        self.con = http.client.HTTPConnection(self.HOST, self.PORT)
316        self.con.connect()
317
318        with support.captured_stderr() as err:
319            self.con.request('ERROR', '/')
320            self.con.getresponse()
321
322        lines = err.getvalue().split('\n')
323        self.assertTrue(lines[0].endswith('code 404, message File not found'))
324        self.assertTrue(lines[1].endswith('"ERROR / HTTP/1.1" 404 -'))
325
326
327class SimpleHTTPServerTestCase(BaseTestCase):
328    class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler):
329        pass
330
331    def setUp(self):
332        BaseTestCase.setUp(self)
333        self.cwd = os.getcwd()
334        basetempdir = tempfile.gettempdir()
335        os.chdir(basetempdir)
336        self.data = b'We are the knights who say Ni!'
337        self.tempdir = tempfile.mkdtemp(dir=basetempdir)
338        self.tempdir_name = os.path.basename(self.tempdir)
339        self.base_url = '/' + self.tempdir_name
340        tempname = os.path.join(self.tempdir, 'test')
341        with open(tempname, 'wb') as temp:
342            temp.write(self.data)
343            temp.flush()
344        mtime = os.stat(tempname).st_mtime
345        # compute last modification datetime for browser cache tests
346        last_modif = datetime.datetime.fromtimestamp(mtime,
347            datetime.timezone.utc)
348        self.last_modif_datetime = last_modif.replace(microsecond=0)
349        self.last_modif_header = email.utils.formatdate(
350            last_modif.timestamp(), usegmt=True)
351
352    def tearDown(self):
353        try:
354            os.chdir(self.cwd)
355            try:
356                shutil.rmtree(self.tempdir)
357            except:
358                pass
359        finally:
360            BaseTestCase.tearDown(self)
361
362    def check_status_and_reason(self, response, status, data=None):
363        def close_conn():
364            """Don't close reader yet so we can check if there was leftover
365            buffered input"""
366            nonlocal reader
367            reader = response.fp
368            response.fp = None
369        reader = None
370        response._close_conn = close_conn
371
372        body = response.read()
373        self.assertTrue(response)
374        self.assertEqual(response.status, status)
375        self.assertIsNotNone(response.reason)
376        if data:
377            self.assertEqual(data, body)
378        # Ensure the server has not set up a persistent connection, and has
379        # not sent any extra data
380        self.assertEqual(response.version, 10)
381        self.assertEqual(response.msg.get("Connection", "close"), "close")
382        self.assertEqual(reader.read(30), b'', 'Connection should be closed')
383
384        reader.close()
385        return body
386
387    @unittest.skipIf(sys.platform == 'darwin',
388                     'undecodable name cannot always be decoded on macOS')
389    @unittest.skipIf(sys.platform == 'win32',
390                     'undecodable name cannot be decoded on win32')
391    @unittest.skipUnless(support.TESTFN_UNDECODABLE,
392                         'need support.TESTFN_UNDECODABLE')
393    def test_undecodable_filename(self):
394        enc = sys.getfilesystemencoding()
395        filename = os.fsdecode(support.TESTFN_UNDECODABLE) + '.txt'
396        with open(os.path.join(self.tempdir, filename), 'wb') as f:
397            f.write(support.TESTFN_UNDECODABLE)
398        response = self.request(self.base_url + '/')
399        if sys.platform == 'darwin':
400            # On Mac OS the HFS+ filesystem replaces bytes that aren't valid
401            # UTF-8 into a percent-encoded value.
402            for name in os.listdir(self.tempdir):
403                if name != 'test': # Ignore a filename created in setUp().
404                    filename = name
405                    break
406        body = self.check_status_and_reason(response, HTTPStatus.OK)
407        quotedname = urllib.parse.quote(filename, errors='surrogatepass')
408        self.assertIn(('href="%s"' % quotedname)
409                      .encode(enc, 'surrogateescape'), body)
410        self.assertIn(('>%s<' % html.escape(filename, quote=False))
411                      .encode(enc, 'surrogateescape'), body)
412        response = self.request(self.base_url + '/' + quotedname)
413        self.check_status_and_reason(response, HTTPStatus.OK,
414                                     data=support.TESTFN_UNDECODABLE)
415
416    def test_get(self):
417        #constructs the path relative to the root directory of the HTTPServer
418        response = self.request(self.base_url + '/test')
419        self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
420        # check for trailing "/" which should return 404. See Issue17324
421        response = self.request(self.base_url + '/test/')
422        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
423        response = self.request(self.base_url + '/')
424        self.check_status_and_reason(response, HTTPStatus.OK)
425        response = self.request(self.base_url)
426        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
427        response = self.request(self.base_url + '/?hi=2')
428        self.check_status_and_reason(response, HTTPStatus.OK)
429        response = self.request(self.base_url + '?hi=1')
430        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
431        self.assertEqual(response.getheader("Location"),
432                         self.base_url + "/?hi=1")
433        response = self.request('/ThisDoesNotExist')
434        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
435        response = self.request('/' + 'ThisDoesNotExist' + '/')
436        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
437
438        data = b"Dummy index file\r\n"
439        with open(os.path.join(self.tempdir_name, 'index.html'), 'wb') as f:
440            f.write(data)
441        response = self.request(self.base_url + '/')
442        self.check_status_and_reason(response, HTTPStatus.OK, data)
443
444        # chmod() doesn't work as expected on Windows, and filesystem
445        # permissions are ignored by root on Unix.
446        if os.name == 'posix' and os.geteuid() != 0:
447            os.chmod(self.tempdir, 0)
448            try:
449                response = self.request(self.base_url + '/')
450                self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
451            finally:
452                os.chmod(self.tempdir, 0o755)
453
454    def test_head(self):
455        response = self.request(
456            self.base_url + '/test', method='HEAD')
457        self.check_status_and_reason(response, HTTPStatus.OK)
458        self.assertEqual(response.getheader('content-length'),
459                         str(len(self.data)))
460        self.assertEqual(response.getheader('content-type'),
461                         'application/octet-stream')
462
463    def test_browser_cache(self):
464        """Check that when a request to /test is sent with the request header
465        If-Modified-Since set to date of last modification, the server returns
466        status code 304, not 200
467        """
468        headers = email.message.Message()
469        headers['If-Modified-Since'] = self.last_modif_header
470        response = self.request(self.base_url + '/test', headers=headers)
471        self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED)
472
473        # one hour after last modification : must return 304
474        new_dt = self.last_modif_datetime + datetime.timedelta(hours=1)
475        headers = email.message.Message()
476        headers['If-Modified-Since'] = email.utils.format_datetime(new_dt,
477            usegmt=True)
478        response = self.request(self.base_url + '/test', headers=headers)
479        self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED)
480
481    def test_browser_cache_file_changed(self):
482        # with If-Modified-Since earlier than Last-Modified, must return 200
483        dt = self.last_modif_datetime
484        # build datetime object : 365 days before last modification
485        old_dt = dt - datetime.timedelta(days=365)
486        headers = email.message.Message()
487        headers['If-Modified-Since'] = email.utils.format_datetime(old_dt,
488            usegmt=True)
489        response = self.request(self.base_url + '/test', headers=headers)
490        self.check_status_and_reason(response, HTTPStatus.OK)
491
492    def test_browser_cache_with_If_None_Match_header(self):
493        # if If-None-Match header is present, ignore If-Modified-Since
494
495        headers = email.message.Message()
496        headers['If-Modified-Since'] = self.last_modif_header
497        headers['If-None-Match'] = "*"
498        response = self.request(self.base_url + '/test', headers=headers)
499        self.check_status_and_reason(response, HTTPStatus.OK)
500
501    def test_invalid_requests(self):
502        response = self.request('/', method='FOO')
503        self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
504        # requests must be case sensitive,so this should fail too
505        response = self.request('/', method='custom')
506        self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
507        response = self.request('/', method='GETs')
508        self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED)
509
510    def test_last_modified(self):
511        """Checks that the datetime returned in Last-Modified response header
512        is the actual datetime of last modification, rounded to the second
513        """
514        response = self.request(self.base_url + '/test')
515        self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
516        last_modif_header = response.headers['Last-modified']
517        self.assertEqual(last_modif_header, self.last_modif_header)
518
519    def test_path_without_leading_slash(self):
520        response = self.request(self.tempdir_name + '/test')
521        self.check_status_and_reason(response, HTTPStatus.OK, data=self.data)
522        response = self.request(self.tempdir_name + '/test/')
523        self.check_status_and_reason(response, HTTPStatus.NOT_FOUND)
524        response = self.request(self.tempdir_name + '/')
525        self.check_status_and_reason(response, HTTPStatus.OK)
526        response = self.request(self.tempdir_name)
527        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
528        response = self.request(self.tempdir_name + '/?hi=2')
529        self.check_status_and_reason(response, HTTPStatus.OK)
530        response = self.request(self.tempdir_name + '?hi=1')
531        self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY)
532        self.assertEqual(response.getheader("Location"),
533                         self.tempdir_name + "/?hi=1")
534
535    def test_html_escape_filename(self):
536        filename = '<test&>.txt'
537        fullpath = os.path.join(self.tempdir, filename)
538
539        try:
540            open(fullpath, 'w').close()
541        except OSError:
542            raise unittest.SkipTest('Can not create file %s on current file '
543                                    'system' % filename)
544
545        try:
546            response = self.request(self.base_url + '/')
547            body = self.check_status_and_reason(response, HTTPStatus.OK)
548            enc = response.headers.get_content_charset()
549        finally:
550            os.unlink(fullpath)  # avoid affecting test_undecodable_filename
551
552        self.assertIsNotNone(enc)
553        html_text = '>%s<' % html.escape(filename, quote=False)
554        self.assertIn(html_text.encode(enc), body)
555
556
557cgi_file1 = """\
558#!%s
559
560print("Content-type: text/html")
561print()
562print("Hello World")
563"""
564
565cgi_file2 = """\
566#!%s
567import cgi
568
569print("Content-type: text/html")
570print()
571
572form = cgi.FieldStorage()
573print("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"),
574                          form.getfirst("bacon")))
575"""
576
577cgi_file4 = """\
578#!%s
579import os
580
581print("Content-type: text/html")
582print()
583
584print(os.environ["%s"])
585"""
586
587
588@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
589        "This test can't be run reliably as root (issue #13308).")
590class CGIHTTPServerTestCase(BaseTestCase):
591    class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler):
592        pass
593
594    linesep = os.linesep.encode('ascii')
595
596    def setUp(self):
597        BaseTestCase.setUp(self)
598        self.cwd = os.getcwd()
599        self.parent_dir = tempfile.mkdtemp()
600        self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin')
601        self.cgi_child_dir = os.path.join(self.cgi_dir, 'child-dir')
602        os.mkdir(self.cgi_dir)
603        os.mkdir(self.cgi_child_dir)
604        self.nocgi_path = None
605        self.file1_path = None
606        self.file2_path = None
607        self.file3_path = None
608        self.file4_path = None
609
610        # The shebang line should be pure ASCII: use symlink if possible.
611        # See issue #7668.
612        if support.can_symlink():
613            self.pythonexe = os.path.join(self.parent_dir, 'python')
614            os.symlink(sys.executable, self.pythonexe)
615        else:
616            self.pythonexe = sys.executable
617
618        try:
619            # The python executable path is written as the first line of the
620            # CGI Python script. The encoding cookie cannot be used, and so the
621            # path should be encodable to the default script encoding (utf-8)
622            self.pythonexe.encode('utf-8')
623        except UnicodeEncodeError:
624            self.tearDown()
625            self.skipTest("Python executable path is not encodable to utf-8")
626
627        self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py')
628        with open(self.nocgi_path, 'w') as fp:
629            fp.write(cgi_file1 % self.pythonexe)
630        os.chmod(self.nocgi_path, 0o777)
631
632        self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
633        with open(self.file1_path, 'w', encoding='utf-8') as file1:
634            file1.write(cgi_file1 % self.pythonexe)
635        os.chmod(self.file1_path, 0o777)
636
637        self.file2_path = os.path.join(self.cgi_dir, 'file2.py')
638        with open(self.file2_path, 'w', encoding='utf-8') as file2:
639            file2.write(cgi_file2 % self.pythonexe)
640        os.chmod(self.file2_path, 0o777)
641
642        self.file3_path = os.path.join(self.cgi_child_dir, 'file3.py')
643        with open(self.file3_path, 'w', encoding='utf-8') as file3:
644            file3.write(cgi_file1 % self.pythonexe)
645        os.chmod(self.file3_path, 0o777)
646
647        self.file4_path = os.path.join(self.cgi_dir, 'file4.py')
648        with open(self.file4_path, 'w', encoding='utf-8') as file4:
649            file4.write(cgi_file4 % (self.pythonexe, 'QUERY_STRING'))
650        os.chmod(self.file4_path, 0o777)
651
652        os.chdir(self.parent_dir)
653
654    def tearDown(self):
655        try:
656            os.chdir(self.cwd)
657            if self.pythonexe != sys.executable:
658                os.remove(self.pythonexe)
659            if self.nocgi_path:
660                os.remove(self.nocgi_path)
661            if self.file1_path:
662                os.remove(self.file1_path)
663            if self.file2_path:
664                os.remove(self.file2_path)
665            if self.file3_path:
666                os.remove(self.file3_path)
667            if self.file4_path:
668                os.remove(self.file4_path)
669            os.rmdir(self.cgi_child_dir)
670            os.rmdir(self.cgi_dir)
671            os.rmdir(self.parent_dir)
672        finally:
673            BaseTestCase.tearDown(self)
674
675    def test_url_collapse_path(self):
676        # verify tail is the last portion and head is the rest on proper urls
677        test_vectors = {
678            '': '//',
679            '..': IndexError,
680            '/.//..': IndexError,
681            '/': '//',
682            '//': '//',
683            '/\\': '//\\',
684            '/.//': '//',
685            'cgi-bin/file1.py': '/cgi-bin/file1.py',
686            '/cgi-bin/file1.py': '/cgi-bin/file1.py',
687            'a': '//a',
688            '/a': '//a',
689            '//a': '//a',
690            './a': '//a',
691            './C:/': '/C:/',
692            '/a/b': '/a/b',
693            '/a/b/': '/a/b/',
694            '/a/b/.': '/a/b/',
695            '/a/b/c/..': '/a/b/',
696            '/a/b/c/../d': '/a/b/d',
697            '/a/b/c/../d/e/../f': '/a/b/d/f',
698            '/a/b/c/../d/e/../../f': '/a/b/f',
699            '/a/b/c/../d/e/.././././..//f': '/a/b/f',
700            '../a/b/c/../d/e/.././././..//f': IndexError,
701            '/a/b/c/../d/e/../../../f': '/a/f',
702            '/a/b/c/../d/e/../../../../f': '//f',
703            '/a/b/c/../d/e/../../../../../f': IndexError,
704            '/a/b/c/../d/e/../../../../f/..': '//',
705            '/a/b/c/../d/e/../../../../f/../.': '//',
706        }
707        for path, expected in test_vectors.items():
708            if isinstance(expected, type) and issubclass(expected, Exception):
709                self.assertRaises(expected,
710                                  server._url_collapse_path, path)
711            else:
712                actual = server._url_collapse_path(path)
713                self.assertEqual(expected, actual,
714                                 msg='path = %r\nGot:    %r\nWanted: %r' %
715                                 (path, actual, expected))
716
717    def test_headers_and_content(self):
718        res = self.request('/cgi-bin/file1.py')
719        self.assertEqual(
720            (res.read(), res.getheader('Content-type'), res.status),
721            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK))
722
723    def test_issue19435(self):
724        res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh')
725        self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
726
727    def test_post(self):
728        params = urllib.parse.urlencode(
729            {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
730        headers = {'Content-type' : 'application/x-www-form-urlencoded'}
731        res = self.request('/cgi-bin/file2.py', 'POST', params, headers)
732
733        self.assertEqual(res.read(), b'1, python, 123456' + self.linesep)
734
735    def test_invaliduri(self):
736        res = self.request('/cgi-bin/invalid')
737        res.read()
738        self.assertEqual(res.status, HTTPStatus.NOT_FOUND)
739
740    def test_authorization(self):
741        headers = {b'Authorization' : b'Basic ' +
742                   base64.b64encode(b'username:pass')}
743        res = self.request('/cgi-bin/file1.py', 'GET', headers=headers)
744        self.assertEqual(
745            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
746            (res.read(), res.getheader('Content-type'), res.status))
747
748    def test_no_leading_slash(self):
749        # http://bugs.python.org/issue2254
750        res = self.request('cgi-bin/file1.py')
751        self.assertEqual(
752            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
753            (res.read(), res.getheader('Content-type'), res.status))
754
755    def test_os_environ_is_not_altered(self):
756        signature = "Test CGI Server"
757        os.environ['SERVER_SOFTWARE'] = signature
758        res = self.request('/cgi-bin/file1.py')
759        self.assertEqual(
760            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
761            (res.read(), res.getheader('Content-type'), res.status))
762        self.assertEqual(os.environ['SERVER_SOFTWARE'], signature)
763
764    def test_urlquote_decoding_in_cgi_check(self):
765        res = self.request('/cgi-bin%2ffile1.py')
766        self.assertEqual(
767            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
768            (res.read(), res.getheader('Content-type'), res.status))
769
770    def test_nested_cgi_path_issue21323(self):
771        res = self.request('/cgi-bin/child-dir/file3.py')
772        self.assertEqual(
773            (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK),
774            (res.read(), res.getheader('Content-type'), res.status))
775
776    def test_query_with_multiple_question_mark(self):
777        res = self.request('/cgi-bin/file4.py?a=b?c=d')
778        self.assertEqual(
779            (b'a=b?c=d' + self.linesep, 'text/html', HTTPStatus.OK),
780            (res.read(), res.getheader('Content-type'), res.status))
781
782    def test_query_with_continuous_slashes(self):
783        res = self.request('/cgi-bin/file4.py?k=aa%2F%2Fbb&//q//p//=//a//b//')
784        self.assertEqual(
785            (b'k=aa%2F%2Fbb&//q//p//=//a//b//' + self.linesep,
786             'text/html', HTTPStatus.OK),
787            (res.read(), res.getheader('Content-type'), res.status))
788
789
790class SocketlessRequestHandler(SimpleHTTPRequestHandler):
791    def __init__(self, *args, **kwargs):
792        request = mock.Mock()
793        request.makefile.return_value = BytesIO()
794        super().__init__(request, None, None)
795
796        self.get_called = False
797        self.protocol_version = "HTTP/1.1"
798
799    def do_GET(self):
800        self.get_called = True
801        self.send_response(HTTPStatus.OK)
802        self.send_header('Content-Type', 'text/html')
803        self.end_headers()
804        self.wfile.write(b'<html><body>Data</body></html>\r\n')
805
806    def log_message(self, format, *args):
807        pass
808
809class RejectingSocketlessRequestHandler(SocketlessRequestHandler):
810    def handle_expect_100(self):
811        self.send_error(HTTPStatus.EXPECTATION_FAILED)
812        return False
813
814
815class AuditableBytesIO:
816
817    def __init__(self):
818        self.datas = []
819
820    def write(self, data):
821        self.datas.append(data)
822
823    def getData(self):
824        return b''.join(self.datas)
825
826    @property
827    def numWrites(self):
828        return len(self.datas)
829
830
831class BaseHTTPRequestHandlerTestCase(unittest.TestCase):
832    """Test the functionality of the BaseHTTPServer.
833
834       Test the support for the Expect 100-continue header.
835       """
836
837    HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK')
838
839    def setUp (self):
840        self.handler = SocketlessRequestHandler()
841
842    def send_typical_request(self, message):
843        input = BytesIO(message)
844        output = BytesIO()
845        self.handler.rfile = input
846        self.handler.wfile = output
847        self.handler.handle_one_request()
848        output.seek(0)
849        return output.readlines()
850
851    def verify_get_called(self):
852        self.assertTrue(self.handler.get_called)
853
854    def verify_expected_headers(self, headers):
855        for fieldName in b'Server: ', b'Date: ', b'Content-Type: ':
856            self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1)
857
858    def verify_http_server_response(self, response):
859        match = self.HTTPResponseMatch.search(response)
860        self.assertIsNotNone(match)
861
862    def test_http_1_1(self):
863        result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n')
864        self.verify_http_server_response(result[0])
865        self.verify_expected_headers(result[1:-1])
866        self.verify_get_called()
867        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
868        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
869        self.assertEqual(self.handler.command, 'GET')
870        self.assertEqual(self.handler.path, '/')
871        self.assertEqual(self.handler.request_version, 'HTTP/1.1')
872        self.assertSequenceEqual(self.handler.headers.items(), ())
873
874    def test_http_1_0(self):
875        result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n')
876        self.verify_http_server_response(result[0])
877        self.verify_expected_headers(result[1:-1])
878        self.verify_get_called()
879        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
880        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0')
881        self.assertEqual(self.handler.command, 'GET')
882        self.assertEqual(self.handler.path, '/')
883        self.assertEqual(self.handler.request_version, 'HTTP/1.0')
884        self.assertSequenceEqual(self.handler.headers.items(), ())
885
886    def test_http_0_9(self):
887        result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n')
888        self.assertEqual(len(result), 1)
889        self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n')
890        self.verify_get_called()
891
892    def test_extra_space(self):
893        result = self.send_typical_request(
894            b'GET /spaced out HTTP/1.1\r\n'
895            b'Host: dummy\r\n'
896            b'\r\n'
897        )
898        self.assertTrue(result[0].startswith(b'HTTP/1.1 400 '))
899        self.verify_expected_headers(result[1:result.index(b'\r\n')])
900        self.assertFalse(self.handler.get_called)
901
902    def test_with_continue_1_0(self):
903        result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n')
904        self.verify_http_server_response(result[0])
905        self.verify_expected_headers(result[1:-1])
906        self.verify_get_called()
907        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
908        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0')
909        self.assertEqual(self.handler.command, 'GET')
910        self.assertEqual(self.handler.path, '/')
911        self.assertEqual(self.handler.request_version, 'HTTP/1.0')
912        headers = (("Expect", "100-continue"),)
913        self.assertSequenceEqual(self.handler.headers.items(), headers)
914
915    def test_with_continue_1_1(self):
916        result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
917        self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n')
918        self.assertEqual(result[1], b'\r\n')
919        self.assertEqual(result[2], b'HTTP/1.1 200 OK\r\n')
920        self.verify_expected_headers(result[2:-1])
921        self.verify_get_called()
922        self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n')
923        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
924        self.assertEqual(self.handler.command, 'GET')
925        self.assertEqual(self.handler.path, '/')
926        self.assertEqual(self.handler.request_version, 'HTTP/1.1')
927        headers = (("Expect", "100-continue"),)
928        self.assertSequenceEqual(self.handler.headers.items(), headers)
929
930    def test_header_buffering_of_send_error(self):
931
932        input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
933        output = AuditableBytesIO()
934        handler = SocketlessRequestHandler()
935        handler.rfile = input
936        handler.wfile = output
937        handler.request_version = 'HTTP/1.1'
938        handler.requestline = ''
939        handler.command = None
940
941        handler.send_error(418)
942        self.assertEqual(output.numWrites, 2)
943
944    def test_header_buffering_of_send_response_only(self):
945
946        input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
947        output = AuditableBytesIO()
948        handler = SocketlessRequestHandler()
949        handler.rfile = input
950        handler.wfile = output
951        handler.request_version = 'HTTP/1.1'
952
953        handler.send_response_only(418)
954        self.assertEqual(output.numWrites, 0)
955        handler.end_headers()
956        self.assertEqual(output.numWrites, 1)
957
958    def test_header_buffering_of_send_header(self):
959
960        input = BytesIO(b'GET / HTTP/1.1\r\n\r\n')
961        output = AuditableBytesIO()
962        handler = SocketlessRequestHandler()
963        handler.rfile = input
964        handler.wfile = output
965        handler.request_version = 'HTTP/1.1'
966
967        handler.send_header('Foo', 'foo')
968        handler.send_header('bar', 'bar')
969        self.assertEqual(output.numWrites, 0)
970        handler.end_headers()
971        self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n')
972        self.assertEqual(output.numWrites, 1)
973
974    def test_header_unbuffered_when_continue(self):
975
976        def _readAndReseek(f):
977            pos = f.tell()
978            f.seek(0)
979            data = f.read()
980            f.seek(pos)
981            return data
982
983        input = BytesIO(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
984        output = BytesIO()
985        self.handler.rfile = input
986        self.handler.wfile = output
987        self.handler.request_version = 'HTTP/1.1'
988
989        self.handler.handle_one_request()
990        self.assertNotEqual(_readAndReseek(output), b'')
991        result = _readAndReseek(output).split(b'\r\n')
992        self.assertEqual(result[0], b'HTTP/1.1 100 Continue')
993        self.assertEqual(result[1], b'')
994        self.assertEqual(result[2], b'HTTP/1.1 200 OK')
995
996    def test_with_continue_rejected(self):
997        usual_handler = self.handler        # Save to avoid breaking any subsequent tests.
998        self.handler = RejectingSocketlessRequestHandler()
999        result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n')
1000        self.assertEqual(result[0], b'HTTP/1.1 417 Expectation Failed\r\n')
1001        self.verify_expected_headers(result[1:-1])
1002        # The expect handler should short circuit the usual get method by
1003        # returning false here, so get_called should be false
1004        self.assertFalse(self.handler.get_called)
1005        self.assertEqual(sum(r == b'Connection: close\r\n' for r in result[1:-1]), 1)
1006        self.handler = usual_handler        # Restore to avoid breaking any subsequent tests.
1007
1008    def test_request_length(self):
1009        # Issue #10714: huge request lines are discarded, to avoid Denial
1010        # of Service attacks.
1011        result = self.send_typical_request(b'GET ' + b'x' * 65537)
1012        self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n')
1013        self.assertFalse(self.handler.get_called)
1014        self.assertIsInstance(self.handler.requestline, str)
1015
1016    def test_header_length(self):
1017        # Issue #6791: same for headers
1018        result = self.send_typical_request(
1019            b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n')
1020        self.assertEqual(result[0], b'HTTP/1.1 431 Line too long\r\n')
1021        self.assertFalse(self.handler.get_called)
1022        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
1023
1024    def test_too_many_headers(self):
1025        result = self.send_typical_request(
1026            b'GET / HTTP/1.1\r\n' + b'X-Foo: bar\r\n' * 101 + b'\r\n')
1027        self.assertEqual(result[0], b'HTTP/1.1 431 Too many headers\r\n')
1028        self.assertFalse(self.handler.get_called)
1029        self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1')
1030
1031    def test_html_escape_on_error(self):
1032        result = self.send_typical_request(
1033            b'<script>alert("hello")</script> / HTTP/1.1')
1034        result = b''.join(result)
1035        text = '<script>alert("hello")</script>'
1036        self.assertIn(html.escape(text, quote=False).encode('ascii'), result)
1037
1038    def test_close_connection(self):
1039        # handle_one_request() should be repeatedly called until
1040        # it sets close_connection
1041        def handle_one_request():
1042            self.handler.close_connection = next(close_values)
1043        self.handler.handle_one_request = handle_one_request
1044
1045        close_values = iter((True,))
1046        self.handler.handle()
1047        self.assertRaises(StopIteration, next, close_values)
1048
1049        close_values = iter((False, False, True))
1050        self.handler.handle()
1051        self.assertRaises(StopIteration, next, close_values)
1052
1053    def test_date_time_string(self):
1054        now = time.time()
1055        # this is the old code that formats the timestamp
1056        year, month, day, hh, mm, ss, wd, y, z = time.gmtime(now)
1057        expected = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
1058            self.handler.weekdayname[wd],
1059            day,
1060            self.handler.monthname[month],
1061            year, hh, mm, ss
1062        )
1063        self.assertEqual(self.handler.date_time_string(timestamp=now), expected)
1064
1065
1066class SimpleHTTPRequestHandlerTestCase(unittest.TestCase):
1067    """ Test url parsing """
1068    def setUp(self):
1069        self.translated = os.getcwd()
1070        self.translated = os.path.join(self.translated, 'filename')
1071        self.handler = SocketlessRequestHandler()
1072
1073    def test_query_arguments(self):
1074        path = self.handler.translate_path('/filename')
1075        self.assertEqual(path, self.translated)
1076        path = self.handler.translate_path('/filename?foo=bar')
1077        self.assertEqual(path, self.translated)
1078        path = self.handler.translate_path('/filename?a=b&spam=eggs#zot')
1079        self.assertEqual(path, self.translated)
1080
1081    def test_start_with_double_slash(self):
1082        path = self.handler.translate_path('//filename')
1083        self.assertEqual(path, self.translated)
1084        path = self.handler.translate_path('//filename?foo=bar')
1085        self.assertEqual(path, self.translated)
1086
1087    def test_windows_colon(self):
1088        with support.swap_attr(server.os, 'path', ntpath):
1089            path = self.handler.translate_path('c:c:c:foo/filename')
1090            path = path.replace(ntpath.sep, os.sep)
1091            self.assertEqual(path, self.translated)
1092
1093            path = self.handler.translate_path('\\c:../filename')
1094            path = path.replace(ntpath.sep, os.sep)
1095            self.assertEqual(path, self.translated)
1096
1097            path = self.handler.translate_path('c:\\c:..\\foo/filename')
1098            path = path.replace(ntpath.sep, os.sep)
1099            self.assertEqual(path, self.translated)
1100
1101            path = self.handler.translate_path('c:c:foo\\c:c:bar/filename')
1102            path = path.replace(ntpath.sep, os.sep)
1103            self.assertEqual(path, self.translated)
1104
1105
1106class MiscTestCase(unittest.TestCase):
1107    def test_all(self):
1108        expected = []
1109        blacklist = {'executable', 'nobody_uid', 'test'}
1110        for name in dir(server):
1111            if name.startswith('_') or name in blacklist:
1112                continue
1113            module_object = getattr(server, name)
1114            if getattr(module_object, '__module__', None) == 'http.server':
1115                expected.append(name)
1116        self.assertCountEqual(server.__all__, expected)
1117
1118
1119def test_main(verbose=None):
1120    cwd = os.getcwd()
1121    try:
1122        support.run_unittest(
1123            RequestHandlerLoggingTestCase,
1124            BaseHTTPRequestHandlerTestCase,
1125            BaseHTTPServerTestCase,
1126            SimpleHTTPServerTestCase,
1127            CGIHTTPServerTestCase,
1128            SimpleHTTPRequestHandlerTestCase,
1129            MiscTestCase,
1130        )
1131    finally:
1132        os.chdir(cwd)
1133
1134if __name__ == '__main__':
1135    test_main()
1136