1"""Unittests for the various HTTPServer modules. 2 3Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>, 4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest. 5""" 6 7from http.server import BaseHTTPRequestHandler, HTTPServer, \ 8 SimpleHTTPRequestHandler, CGIHTTPRequestHandler 9from http import server, HTTPStatus 10 11import os 12import sys 13import re 14import base64 15import ntpath 16import shutil 17import email.message 18import email.utils 19import html 20import http.client 21import urllib.parse 22import tempfile 23import time 24import datetime 25import threading 26from unittest import mock 27from io import BytesIO 28 29import unittest 30from test import support 31 32 33class NoLogRequestHandler: 34 def log_message(self, *args): 35 # don't write log messages to stderr 36 pass 37 38 def read(self, n=None): 39 return '' 40 41 42class TestServerThread(threading.Thread): 43 def __init__(self, test_object, request_handler): 44 threading.Thread.__init__(self) 45 self.request_handler = request_handler 46 self.test_object = test_object 47 48 def run(self): 49 self.server = HTTPServer(('localhost', 0), self.request_handler) 50 self.test_object.HOST, self.test_object.PORT = self.server.socket.getsockname() 51 self.test_object.server_started.set() 52 self.test_object = None 53 try: 54 self.server.serve_forever(0.05) 55 finally: 56 self.server.server_close() 57 58 def stop(self): 59 self.server.shutdown() 60 self.join() 61 62 63class BaseTestCase(unittest.TestCase): 64 def setUp(self): 65 self._threads = support.threading_setup() 66 os.environ = support.EnvironmentVarGuard() 67 self.server_started = threading.Event() 68 self.thread = TestServerThread(self, self.request_handler) 69 self.thread.start() 70 self.server_started.wait() 71 72 def tearDown(self): 73 self.thread.stop() 74 self.thread = None 75 os.environ.__exit__() 76 support.threading_cleanup(*self._threads) 77 78 def request(self, uri, method='GET', body=None, headers={}): 79 self.connection = http.client.HTTPConnection(self.HOST, self.PORT) 80 self.connection.request(method, uri, body, headers) 81 return self.connection.getresponse() 82 83 84class BaseHTTPServerTestCase(BaseTestCase): 85 class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): 86 protocol_version = 'HTTP/1.1' 87 default_request_version = 'HTTP/1.1' 88 89 def do_TEST(self): 90 self.send_response(HTTPStatus.NO_CONTENT) 91 self.send_header('Content-Type', 'text/html') 92 self.send_header('Connection', 'close') 93 self.end_headers() 94 95 def do_KEEP(self): 96 self.send_response(HTTPStatus.NO_CONTENT) 97 self.send_header('Content-Type', 'text/html') 98 self.send_header('Connection', 'keep-alive') 99 self.end_headers() 100 101 def do_KEYERROR(self): 102 self.send_error(999) 103 104 def do_NOTFOUND(self): 105 self.send_error(HTTPStatus.NOT_FOUND) 106 107 def do_EXPLAINERROR(self): 108 self.send_error(999, "Short Message", 109 "This is a long \n explanation") 110 111 def do_CUSTOM(self): 112 self.send_response(999) 113 self.send_header('Content-Type', 'text/html') 114 self.send_header('Connection', 'close') 115 self.end_headers() 116 117 def do_LATINONEHEADER(self): 118 self.send_response(999) 119 self.send_header('X-Special', 'Dängerous Mind') 120 self.send_header('Connection', 'close') 121 self.end_headers() 122 body = self.headers['x-special-incoming'].encode('utf-8') 123 self.wfile.write(body) 124 125 def do_SEND_ERROR(self): 126 self.send_error(int(self.path[1:])) 127 128 def do_HEAD(self): 129 self.send_error(int(self.path[1:])) 130 131 def setUp(self): 132 BaseTestCase.setUp(self) 133 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 134 self.con.connect() 135 136 def test_command(self): 137 self.con.request('GET', '/') 138 res = self.con.getresponse() 139 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 140 141 def test_request_line_trimming(self): 142 self.con._http_vsn_str = 'HTTP/1.1\n' 143 self.con.putrequest('XYZBOGUS', '/') 144 self.con.endheaders() 145 res = self.con.getresponse() 146 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 147 148 def test_version_bogus(self): 149 self.con._http_vsn_str = 'FUBAR' 150 self.con.putrequest('GET', '/') 151 self.con.endheaders() 152 res = self.con.getresponse() 153 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 154 155 def test_version_digits(self): 156 self.con._http_vsn_str = 'HTTP/9.9.9' 157 self.con.putrequest('GET', '/') 158 self.con.endheaders() 159 res = self.con.getresponse() 160 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 161 162 def test_version_none_get(self): 163 self.con._http_vsn_str = '' 164 self.con.putrequest('GET', '/') 165 self.con.endheaders() 166 res = self.con.getresponse() 167 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 168 169 def test_version_none(self): 170 # Test that a valid method is rejected when not HTTP/1.x 171 self.con._http_vsn_str = '' 172 self.con.putrequest('CUSTOM', '/') 173 self.con.endheaders() 174 res = self.con.getresponse() 175 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 176 177 def test_version_invalid(self): 178 self.con._http_vsn = 99 179 self.con._http_vsn_str = 'HTTP/9.9' 180 self.con.putrequest('GET', '/') 181 self.con.endheaders() 182 res = self.con.getresponse() 183 self.assertEqual(res.status, HTTPStatus.HTTP_VERSION_NOT_SUPPORTED) 184 185 def test_send_blank(self): 186 self.con._http_vsn_str = '' 187 self.con.putrequest('', '') 188 self.con.endheaders() 189 res = self.con.getresponse() 190 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 191 192 def test_header_close(self): 193 self.con.putrequest('GET', '/') 194 self.con.putheader('Connection', 'close') 195 self.con.endheaders() 196 res = self.con.getresponse() 197 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 198 199 def test_header_keep_alive(self): 200 self.con._http_vsn_str = 'HTTP/1.1' 201 self.con.putrequest('GET', '/') 202 self.con.putheader('Connection', 'keep-alive') 203 self.con.endheaders() 204 res = self.con.getresponse() 205 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 206 207 def test_handler(self): 208 self.con.request('TEST', '/') 209 res = self.con.getresponse() 210 self.assertEqual(res.status, HTTPStatus.NO_CONTENT) 211 212 def test_return_header_keep_alive(self): 213 self.con.request('KEEP', '/') 214 res = self.con.getresponse() 215 self.assertEqual(res.getheader('Connection'), 'keep-alive') 216 self.con.request('TEST', '/') 217 self.addCleanup(self.con.close) 218 219 def test_internal_key_error(self): 220 self.con.request('KEYERROR', '/') 221 res = self.con.getresponse() 222 self.assertEqual(res.status, 999) 223 224 def test_return_custom_status(self): 225 self.con.request('CUSTOM', '/') 226 res = self.con.getresponse() 227 self.assertEqual(res.status, 999) 228 229 def test_return_explain_error(self): 230 self.con.request('EXPLAINERROR', '/') 231 res = self.con.getresponse() 232 self.assertEqual(res.status, 999) 233 self.assertTrue(int(res.getheader('Content-Length'))) 234 235 def test_latin1_header(self): 236 self.con.request('LATINONEHEADER', '/', headers={ 237 'X-Special-Incoming': 'Ärger mit Unicode' 238 }) 239 res = self.con.getresponse() 240 self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind') 241 self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8')) 242 243 def test_error_content_length(self): 244 # Issue #16088: standard error responses should have a content-length 245 self.con.request('NOTFOUND', '/') 246 res = self.con.getresponse() 247 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 248 249 data = res.read() 250 self.assertEqual(int(res.getheader('Content-Length')), len(data)) 251 252 def test_send_error(self): 253 allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED, 254 HTTPStatus.RESET_CONTENT) 255 for code in (HTTPStatus.NO_CONTENT, HTTPStatus.NOT_MODIFIED, 256 HTTPStatus.PROCESSING, HTTPStatus.RESET_CONTENT, 257 HTTPStatus.SWITCHING_PROTOCOLS): 258 self.con.request('SEND_ERROR', '/{}'.format(code)) 259 res = self.con.getresponse() 260 self.assertEqual(code, res.status) 261 self.assertEqual(None, res.getheader('Content-Length')) 262 self.assertEqual(None, res.getheader('Content-Type')) 263 if code not in allow_transfer_encoding_codes: 264 self.assertEqual(None, res.getheader('Transfer-Encoding')) 265 266 data = res.read() 267 self.assertEqual(b'', data) 268 269 def test_head_via_send_error(self): 270 allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED, 271 HTTPStatus.RESET_CONTENT) 272 for code in (HTTPStatus.OK, HTTPStatus.NO_CONTENT, 273 HTTPStatus.NOT_MODIFIED, HTTPStatus.RESET_CONTENT, 274 HTTPStatus.SWITCHING_PROTOCOLS): 275 self.con.request('HEAD', '/{}'.format(code)) 276 res = self.con.getresponse() 277 self.assertEqual(code, res.status) 278 if code == HTTPStatus.OK: 279 self.assertTrue(int(res.getheader('Content-Length')) > 0) 280 self.assertIn('text/html', res.getheader('Content-Type')) 281 else: 282 self.assertEqual(None, res.getheader('Content-Length')) 283 self.assertEqual(None, res.getheader('Content-Type')) 284 if code not in allow_transfer_encoding_codes: 285 self.assertEqual(None, res.getheader('Transfer-Encoding')) 286 287 data = res.read() 288 self.assertEqual(b'', data) 289 290 291class RequestHandlerLoggingTestCase(BaseTestCase): 292 class request_handler(BaseHTTPRequestHandler): 293 protocol_version = 'HTTP/1.1' 294 default_request_version = 'HTTP/1.1' 295 296 def do_GET(self): 297 self.send_response(HTTPStatus.OK) 298 self.end_headers() 299 300 def do_ERROR(self): 301 self.send_error(HTTPStatus.NOT_FOUND, 'File not found') 302 303 def test_get(self): 304 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 305 self.con.connect() 306 307 with support.captured_stderr() as err: 308 self.con.request('GET', '/') 309 self.con.getresponse() 310 311 self.assertTrue( 312 err.getvalue().endswith('"GET / HTTP/1.1" 200 -\n')) 313 314 def test_err(self): 315 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 316 self.con.connect() 317 318 with support.captured_stderr() as err: 319 self.con.request('ERROR', '/') 320 self.con.getresponse() 321 322 lines = err.getvalue().split('\n') 323 self.assertTrue(lines[0].endswith('code 404, message File not found')) 324 self.assertTrue(lines[1].endswith('"ERROR / HTTP/1.1" 404 -')) 325 326 327class SimpleHTTPServerTestCase(BaseTestCase): 328 class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler): 329 pass 330 331 def setUp(self): 332 BaseTestCase.setUp(self) 333 self.cwd = os.getcwd() 334 basetempdir = tempfile.gettempdir() 335 os.chdir(basetempdir) 336 self.data = b'We are the knights who say Ni!' 337 self.tempdir = tempfile.mkdtemp(dir=basetempdir) 338 self.tempdir_name = os.path.basename(self.tempdir) 339 self.base_url = '/' + self.tempdir_name 340 tempname = os.path.join(self.tempdir, 'test') 341 with open(tempname, 'wb') as temp: 342 temp.write(self.data) 343 temp.flush() 344 mtime = os.stat(tempname).st_mtime 345 # compute last modification datetime for browser cache tests 346 last_modif = datetime.datetime.fromtimestamp(mtime, 347 datetime.timezone.utc) 348 self.last_modif_datetime = last_modif.replace(microsecond=0) 349 self.last_modif_header = email.utils.formatdate( 350 last_modif.timestamp(), usegmt=True) 351 352 def tearDown(self): 353 try: 354 os.chdir(self.cwd) 355 try: 356 shutil.rmtree(self.tempdir) 357 except: 358 pass 359 finally: 360 BaseTestCase.tearDown(self) 361 362 def check_status_and_reason(self, response, status, data=None): 363 def close_conn(): 364 """Don't close reader yet so we can check if there was leftover 365 buffered input""" 366 nonlocal reader 367 reader = response.fp 368 response.fp = None 369 reader = None 370 response._close_conn = close_conn 371 372 body = response.read() 373 self.assertTrue(response) 374 self.assertEqual(response.status, status) 375 self.assertIsNotNone(response.reason) 376 if data: 377 self.assertEqual(data, body) 378 # Ensure the server has not set up a persistent connection, and has 379 # not sent any extra data 380 self.assertEqual(response.version, 10) 381 self.assertEqual(response.msg.get("Connection", "close"), "close") 382 self.assertEqual(reader.read(30), b'', 'Connection should be closed') 383 384 reader.close() 385 return body 386 387 @unittest.skipIf(sys.platform == 'darwin', 388 'undecodable name cannot always be decoded on macOS') 389 @unittest.skipIf(sys.platform == 'win32', 390 'undecodable name cannot be decoded on win32') 391 @unittest.skipUnless(support.TESTFN_UNDECODABLE, 392 'need support.TESTFN_UNDECODABLE') 393 def test_undecodable_filename(self): 394 enc = sys.getfilesystemencoding() 395 filename = os.fsdecode(support.TESTFN_UNDECODABLE) + '.txt' 396 with open(os.path.join(self.tempdir, filename), 'wb') as f: 397 f.write(support.TESTFN_UNDECODABLE) 398 response = self.request(self.base_url + '/') 399 if sys.platform == 'darwin': 400 # On Mac OS the HFS+ filesystem replaces bytes that aren't valid 401 # UTF-8 into a percent-encoded value. 402 for name in os.listdir(self.tempdir): 403 if name != 'test': # Ignore a filename created in setUp(). 404 filename = name 405 break 406 body = self.check_status_and_reason(response, HTTPStatus.OK) 407 quotedname = urllib.parse.quote(filename, errors='surrogatepass') 408 self.assertIn(('href="%s"' % quotedname) 409 .encode(enc, 'surrogateescape'), body) 410 self.assertIn(('>%s<' % html.escape(filename, quote=False)) 411 .encode(enc, 'surrogateescape'), body) 412 response = self.request(self.base_url + '/' + quotedname) 413 self.check_status_and_reason(response, HTTPStatus.OK, 414 data=support.TESTFN_UNDECODABLE) 415 416 def test_get(self): 417 #constructs the path relative to the root directory of the HTTPServer 418 response = self.request(self.base_url + '/test') 419 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) 420 # check for trailing "/" which should return 404. See Issue17324 421 response = self.request(self.base_url + '/test/') 422 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 423 response = self.request(self.base_url + '/') 424 self.check_status_and_reason(response, HTTPStatus.OK) 425 response = self.request(self.base_url) 426 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 427 response = self.request(self.base_url + '/?hi=2') 428 self.check_status_and_reason(response, HTTPStatus.OK) 429 response = self.request(self.base_url + '?hi=1') 430 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 431 self.assertEqual(response.getheader("Location"), 432 self.base_url + "/?hi=1") 433 response = self.request('/ThisDoesNotExist') 434 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 435 response = self.request('/' + 'ThisDoesNotExist' + '/') 436 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 437 438 data = b"Dummy index file\r\n" 439 with open(os.path.join(self.tempdir_name, 'index.html'), 'wb') as f: 440 f.write(data) 441 response = self.request(self.base_url + '/') 442 self.check_status_and_reason(response, HTTPStatus.OK, data) 443 444 # chmod() doesn't work as expected on Windows, and filesystem 445 # permissions are ignored by root on Unix. 446 if os.name == 'posix' and os.geteuid() != 0: 447 os.chmod(self.tempdir, 0) 448 try: 449 response = self.request(self.base_url + '/') 450 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 451 finally: 452 os.chmod(self.tempdir, 0o755) 453 454 def test_head(self): 455 response = self.request( 456 self.base_url + '/test', method='HEAD') 457 self.check_status_and_reason(response, HTTPStatus.OK) 458 self.assertEqual(response.getheader('content-length'), 459 str(len(self.data))) 460 self.assertEqual(response.getheader('content-type'), 461 'application/octet-stream') 462 463 def test_browser_cache(self): 464 """Check that when a request to /test is sent with the request header 465 If-Modified-Since set to date of last modification, the server returns 466 status code 304, not 200 467 """ 468 headers = email.message.Message() 469 headers['If-Modified-Since'] = self.last_modif_header 470 response = self.request(self.base_url + '/test', headers=headers) 471 self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED) 472 473 # one hour after last modification : must return 304 474 new_dt = self.last_modif_datetime + datetime.timedelta(hours=1) 475 headers = email.message.Message() 476 headers['If-Modified-Since'] = email.utils.format_datetime(new_dt, 477 usegmt=True) 478 response = self.request(self.base_url + '/test', headers=headers) 479 self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED) 480 481 def test_browser_cache_file_changed(self): 482 # with If-Modified-Since earlier than Last-Modified, must return 200 483 dt = self.last_modif_datetime 484 # build datetime object : 365 days before last modification 485 old_dt = dt - datetime.timedelta(days=365) 486 headers = email.message.Message() 487 headers['If-Modified-Since'] = email.utils.format_datetime(old_dt, 488 usegmt=True) 489 response = self.request(self.base_url + '/test', headers=headers) 490 self.check_status_and_reason(response, HTTPStatus.OK) 491 492 def test_browser_cache_with_If_None_Match_header(self): 493 # if If-None-Match header is present, ignore If-Modified-Since 494 495 headers = email.message.Message() 496 headers['If-Modified-Since'] = self.last_modif_header 497 headers['If-None-Match'] = "*" 498 response = self.request(self.base_url + '/test', headers=headers) 499 self.check_status_and_reason(response, HTTPStatus.OK) 500 501 def test_invalid_requests(self): 502 response = self.request('/', method='FOO') 503 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 504 # requests must be case sensitive,so this should fail too 505 response = self.request('/', method='custom') 506 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 507 response = self.request('/', method='GETs') 508 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 509 510 def test_last_modified(self): 511 """Checks that the datetime returned in Last-Modified response header 512 is the actual datetime of last modification, rounded to the second 513 """ 514 response = self.request(self.base_url + '/test') 515 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) 516 last_modif_header = response.headers['Last-modified'] 517 self.assertEqual(last_modif_header, self.last_modif_header) 518 519 def test_path_without_leading_slash(self): 520 response = self.request(self.tempdir_name + '/test') 521 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) 522 response = self.request(self.tempdir_name + '/test/') 523 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 524 response = self.request(self.tempdir_name + '/') 525 self.check_status_and_reason(response, HTTPStatus.OK) 526 response = self.request(self.tempdir_name) 527 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 528 response = self.request(self.tempdir_name + '/?hi=2') 529 self.check_status_and_reason(response, HTTPStatus.OK) 530 response = self.request(self.tempdir_name + '?hi=1') 531 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 532 self.assertEqual(response.getheader("Location"), 533 self.tempdir_name + "/?hi=1") 534 535 def test_html_escape_filename(self): 536 filename = '<test&>.txt' 537 fullpath = os.path.join(self.tempdir, filename) 538 539 try: 540 open(fullpath, 'w').close() 541 except OSError: 542 raise unittest.SkipTest('Can not create file %s on current file ' 543 'system' % filename) 544 545 try: 546 response = self.request(self.base_url + '/') 547 body = self.check_status_and_reason(response, HTTPStatus.OK) 548 enc = response.headers.get_content_charset() 549 finally: 550 os.unlink(fullpath) # avoid affecting test_undecodable_filename 551 552 self.assertIsNotNone(enc) 553 html_text = '>%s<' % html.escape(filename, quote=False) 554 self.assertIn(html_text.encode(enc), body) 555 556 557cgi_file1 = """\ 558#!%s 559 560print("Content-type: text/html") 561print() 562print("Hello World") 563""" 564 565cgi_file2 = """\ 566#!%s 567import cgi 568 569print("Content-type: text/html") 570print() 571 572form = cgi.FieldStorage() 573print("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"), 574 form.getfirst("bacon"))) 575""" 576 577cgi_file4 = """\ 578#!%s 579import os 580 581print("Content-type: text/html") 582print() 583 584print(os.environ["%s"]) 585""" 586 587 588@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, 589 "This test can't be run reliably as root (issue #13308).") 590class CGIHTTPServerTestCase(BaseTestCase): 591 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler): 592 pass 593 594 linesep = os.linesep.encode('ascii') 595 596 def setUp(self): 597 BaseTestCase.setUp(self) 598 self.cwd = os.getcwd() 599 self.parent_dir = tempfile.mkdtemp() 600 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin') 601 self.cgi_child_dir = os.path.join(self.cgi_dir, 'child-dir') 602 os.mkdir(self.cgi_dir) 603 os.mkdir(self.cgi_child_dir) 604 self.nocgi_path = None 605 self.file1_path = None 606 self.file2_path = None 607 self.file3_path = None 608 self.file4_path = None 609 610 # The shebang line should be pure ASCII: use symlink if possible. 611 # See issue #7668. 612 if support.can_symlink(): 613 self.pythonexe = os.path.join(self.parent_dir, 'python') 614 os.symlink(sys.executable, self.pythonexe) 615 else: 616 self.pythonexe = sys.executable 617 618 try: 619 # The python executable path is written as the first line of the 620 # CGI Python script. The encoding cookie cannot be used, and so the 621 # path should be encodable to the default script encoding (utf-8) 622 self.pythonexe.encode('utf-8') 623 except UnicodeEncodeError: 624 self.tearDown() 625 self.skipTest("Python executable path is not encodable to utf-8") 626 627 self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py') 628 with open(self.nocgi_path, 'w') as fp: 629 fp.write(cgi_file1 % self.pythonexe) 630 os.chmod(self.nocgi_path, 0o777) 631 632 self.file1_path = os.path.join(self.cgi_dir, 'file1.py') 633 with open(self.file1_path, 'w', encoding='utf-8') as file1: 634 file1.write(cgi_file1 % self.pythonexe) 635 os.chmod(self.file1_path, 0o777) 636 637 self.file2_path = os.path.join(self.cgi_dir, 'file2.py') 638 with open(self.file2_path, 'w', encoding='utf-8') as file2: 639 file2.write(cgi_file2 % self.pythonexe) 640 os.chmod(self.file2_path, 0o777) 641 642 self.file3_path = os.path.join(self.cgi_child_dir, 'file3.py') 643 with open(self.file3_path, 'w', encoding='utf-8') as file3: 644 file3.write(cgi_file1 % self.pythonexe) 645 os.chmod(self.file3_path, 0o777) 646 647 self.file4_path = os.path.join(self.cgi_dir, 'file4.py') 648 with open(self.file4_path, 'w', encoding='utf-8') as file4: 649 file4.write(cgi_file4 % (self.pythonexe, 'QUERY_STRING')) 650 os.chmod(self.file4_path, 0o777) 651 652 os.chdir(self.parent_dir) 653 654 def tearDown(self): 655 try: 656 os.chdir(self.cwd) 657 if self.pythonexe != sys.executable: 658 os.remove(self.pythonexe) 659 if self.nocgi_path: 660 os.remove(self.nocgi_path) 661 if self.file1_path: 662 os.remove(self.file1_path) 663 if self.file2_path: 664 os.remove(self.file2_path) 665 if self.file3_path: 666 os.remove(self.file3_path) 667 if self.file4_path: 668 os.remove(self.file4_path) 669 os.rmdir(self.cgi_child_dir) 670 os.rmdir(self.cgi_dir) 671 os.rmdir(self.parent_dir) 672 finally: 673 BaseTestCase.tearDown(self) 674 675 def test_url_collapse_path(self): 676 # verify tail is the last portion and head is the rest on proper urls 677 test_vectors = { 678 '': '//', 679 '..': IndexError, 680 '/.//..': IndexError, 681 '/': '//', 682 '//': '//', 683 '/\\': '//\\', 684 '/.//': '//', 685 'cgi-bin/file1.py': '/cgi-bin/file1.py', 686 '/cgi-bin/file1.py': '/cgi-bin/file1.py', 687 'a': '//a', 688 '/a': '//a', 689 '//a': '//a', 690 './a': '//a', 691 './C:/': '/C:/', 692 '/a/b': '/a/b', 693 '/a/b/': '/a/b/', 694 '/a/b/.': '/a/b/', 695 '/a/b/c/..': '/a/b/', 696 '/a/b/c/../d': '/a/b/d', 697 '/a/b/c/../d/e/../f': '/a/b/d/f', 698 '/a/b/c/../d/e/../../f': '/a/b/f', 699 '/a/b/c/../d/e/.././././..//f': '/a/b/f', 700 '../a/b/c/../d/e/.././././..//f': IndexError, 701 '/a/b/c/../d/e/../../../f': '/a/f', 702 '/a/b/c/../d/e/../../../../f': '//f', 703 '/a/b/c/../d/e/../../../../../f': IndexError, 704 '/a/b/c/../d/e/../../../../f/..': '//', 705 '/a/b/c/../d/e/../../../../f/../.': '//', 706 } 707 for path, expected in test_vectors.items(): 708 if isinstance(expected, type) and issubclass(expected, Exception): 709 self.assertRaises(expected, 710 server._url_collapse_path, path) 711 else: 712 actual = server._url_collapse_path(path) 713 self.assertEqual(expected, actual, 714 msg='path = %r\nGot: %r\nWanted: %r' % 715 (path, actual, expected)) 716 717 def test_headers_and_content(self): 718 res = self.request('/cgi-bin/file1.py') 719 self.assertEqual( 720 (res.read(), res.getheader('Content-type'), res.status), 721 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK)) 722 723 def test_issue19435(self): 724 res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh') 725 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 726 727 def test_post(self): 728 params = urllib.parse.urlencode( 729 {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456}) 730 headers = {'Content-type' : 'application/x-www-form-urlencoded'} 731 res = self.request('/cgi-bin/file2.py', 'POST', params, headers) 732 733 self.assertEqual(res.read(), b'1, python, 123456' + self.linesep) 734 735 def test_invaliduri(self): 736 res = self.request('/cgi-bin/invalid') 737 res.read() 738 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 739 740 def test_authorization(self): 741 headers = {b'Authorization' : b'Basic ' + 742 base64.b64encode(b'username:pass')} 743 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers) 744 self.assertEqual( 745 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 746 (res.read(), res.getheader('Content-type'), res.status)) 747 748 def test_no_leading_slash(self): 749 # http://bugs.python.org/issue2254 750 res = self.request('cgi-bin/file1.py') 751 self.assertEqual( 752 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 753 (res.read(), res.getheader('Content-type'), res.status)) 754 755 def test_os_environ_is_not_altered(self): 756 signature = "Test CGI Server" 757 os.environ['SERVER_SOFTWARE'] = signature 758 res = self.request('/cgi-bin/file1.py') 759 self.assertEqual( 760 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 761 (res.read(), res.getheader('Content-type'), res.status)) 762 self.assertEqual(os.environ['SERVER_SOFTWARE'], signature) 763 764 def test_urlquote_decoding_in_cgi_check(self): 765 res = self.request('/cgi-bin%2ffile1.py') 766 self.assertEqual( 767 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 768 (res.read(), res.getheader('Content-type'), res.status)) 769 770 def test_nested_cgi_path_issue21323(self): 771 res = self.request('/cgi-bin/child-dir/file3.py') 772 self.assertEqual( 773 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 774 (res.read(), res.getheader('Content-type'), res.status)) 775 776 def test_query_with_multiple_question_mark(self): 777 res = self.request('/cgi-bin/file4.py?a=b?c=d') 778 self.assertEqual( 779 (b'a=b?c=d' + self.linesep, 'text/html', HTTPStatus.OK), 780 (res.read(), res.getheader('Content-type'), res.status)) 781 782 def test_query_with_continuous_slashes(self): 783 res = self.request('/cgi-bin/file4.py?k=aa%2F%2Fbb&//q//p//=//a//b//') 784 self.assertEqual( 785 (b'k=aa%2F%2Fbb&//q//p//=//a//b//' + self.linesep, 786 'text/html', HTTPStatus.OK), 787 (res.read(), res.getheader('Content-type'), res.status)) 788 789 790class SocketlessRequestHandler(SimpleHTTPRequestHandler): 791 def __init__(self, *args, **kwargs): 792 request = mock.Mock() 793 request.makefile.return_value = BytesIO() 794 super().__init__(request, None, None) 795 796 self.get_called = False 797 self.protocol_version = "HTTP/1.1" 798 799 def do_GET(self): 800 self.get_called = True 801 self.send_response(HTTPStatus.OK) 802 self.send_header('Content-Type', 'text/html') 803 self.end_headers() 804 self.wfile.write(b'<html><body>Data</body></html>\r\n') 805 806 def log_message(self, format, *args): 807 pass 808 809class RejectingSocketlessRequestHandler(SocketlessRequestHandler): 810 def handle_expect_100(self): 811 self.send_error(HTTPStatus.EXPECTATION_FAILED) 812 return False 813 814 815class AuditableBytesIO: 816 817 def __init__(self): 818 self.datas = [] 819 820 def write(self, data): 821 self.datas.append(data) 822 823 def getData(self): 824 return b''.join(self.datas) 825 826 @property 827 def numWrites(self): 828 return len(self.datas) 829 830 831class BaseHTTPRequestHandlerTestCase(unittest.TestCase): 832 """Test the functionality of the BaseHTTPServer. 833 834 Test the support for the Expect 100-continue header. 835 """ 836 837 HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK') 838 839 def setUp (self): 840 self.handler = SocketlessRequestHandler() 841 842 def send_typical_request(self, message): 843 input = BytesIO(message) 844 output = BytesIO() 845 self.handler.rfile = input 846 self.handler.wfile = output 847 self.handler.handle_one_request() 848 output.seek(0) 849 return output.readlines() 850 851 def verify_get_called(self): 852 self.assertTrue(self.handler.get_called) 853 854 def verify_expected_headers(self, headers): 855 for fieldName in b'Server: ', b'Date: ', b'Content-Type: ': 856 self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1) 857 858 def verify_http_server_response(self, response): 859 match = self.HTTPResponseMatch.search(response) 860 self.assertIsNotNone(match) 861 862 def test_http_1_1(self): 863 result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n') 864 self.verify_http_server_response(result[0]) 865 self.verify_expected_headers(result[1:-1]) 866 self.verify_get_called() 867 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 868 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 869 self.assertEqual(self.handler.command, 'GET') 870 self.assertEqual(self.handler.path, '/') 871 self.assertEqual(self.handler.request_version, 'HTTP/1.1') 872 self.assertSequenceEqual(self.handler.headers.items(), ()) 873 874 def test_http_1_0(self): 875 result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n') 876 self.verify_http_server_response(result[0]) 877 self.verify_expected_headers(result[1:-1]) 878 self.verify_get_called() 879 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 880 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0') 881 self.assertEqual(self.handler.command, 'GET') 882 self.assertEqual(self.handler.path, '/') 883 self.assertEqual(self.handler.request_version, 'HTTP/1.0') 884 self.assertSequenceEqual(self.handler.headers.items(), ()) 885 886 def test_http_0_9(self): 887 result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n') 888 self.assertEqual(len(result), 1) 889 self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n') 890 self.verify_get_called() 891 892 def test_extra_space(self): 893 result = self.send_typical_request( 894 b'GET /spaced out HTTP/1.1\r\n' 895 b'Host: dummy\r\n' 896 b'\r\n' 897 ) 898 self.assertTrue(result[0].startswith(b'HTTP/1.1 400 ')) 899 self.verify_expected_headers(result[1:result.index(b'\r\n')]) 900 self.assertFalse(self.handler.get_called) 901 902 def test_with_continue_1_0(self): 903 result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n') 904 self.verify_http_server_response(result[0]) 905 self.verify_expected_headers(result[1:-1]) 906 self.verify_get_called() 907 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 908 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0') 909 self.assertEqual(self.handler.command, 'GET') 910 self.assertEqual(self.handler.path, '/') 911 self.assertEqual(self.handler.request_version, 'HTTP/1.0') 912 headers = (("Expect", "100-continue"),) 913 self.assertSequenceEqual(self.handler.headers.items(), headers) 914 915 def test_with_continue_1_1(self): 916 result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 917 self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n') 918 self.assertEqual(result[1], b'\r\n') 919 self.assertEqual(result[2], b'HTTP/1.1 200 OK\r\n') 920 self.verify_expected_headers(result[2:-1]) 921 self.verify_get_called() 922 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 923 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 924 self.assertEqual(self.handler.command, 'GET') 925 self.assertEqual(self.handler.path, '/') 926 self.assertEqual(self.handler.request_version, 'HTTP/1.1') 927 headers = (("Expect", "100-continue"),) 928 self.assertSequenceEqual(self.handler.headers.items(), headers) 929 930 def test_header_buffering_of_send_error(self): 931 932 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 933 output = AuditableBytesIO() 934 handler = SocketlessRequestHandler() 935 handler.rfile = input 936 handler.wfile = output 937 handler.request_version = 'HTTP/1.1' 938 handler.requestline = '' 939 handler.command = None 940 941 handler.send_error(418) 942 self.assertEqual(output.numWrites, 2) 943 944 def test_header_buffering_of_send_response_only(self): 945 946 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 947 output = AuditableBytesIO() 948 handler = SocketlessRequestHandler() 949 handler.rfile = input 950 handler.wfile = output 951 handler.request_version = 'HTTP/1.1' 952 953 handler.send_response_only(418) 954 self.assertEqual(output.numWrites, 0) 955 handler.end_headers() 956 self.assertEqual(output.numWrites, 1) 957 958 def test_header_buffering_of_send_header(self): 959 960 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 961 output = AuditableBytesIO() 962 handler = SocketlessRequestHandler() 963 handler.rfile = input 964 handler.wfile = output 965 handler.request_version = 'HTTP/1.1' 966 967 handler.send_header('Foo', 'foo') 968 handler.send_header('bar', 'bar') 969 self.assertEqual(output.numWrites, 0) 970 handler.end_headers() 971 self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n') 972 self.assertEqual(output.numWrites, 1) 973 974 def test_header_unbuffered_when_continue(self): 975 976 def _readAndReseek(f): 977 pos = f.tell() 978 f.seek(0) 979 data = f.read() 980 f.seek(pos) 981 return data 982 983 input = BytesIO(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 984 output = BytesIO() 985 self.handler.rfile = input 986 self.handler.wfile = output 987 self.handler.request_version = 'HTTP/1.1' 988 989 self.handler.handle_one_request() 990 self.assertNotEqual(_readAndReseek(output), b'') 991 result = _readAndReseek(output).split(b'\r\n') 992 self.assertEqual(result[0], b'HTTP/1.1 100 Continue') 993 self.assertEqual(result[1], b'') 994 self.assertEqual(result[2], b'HTTP/1.1 200 OK') 995 996 def test_with_continue_rejected(self): 997 usual_handler = self.handler # Save to avoid breaking any subsequent tests. 998 self.handler = RejectingSocketlessRequestHandler() 999 result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 1000 self.assertEqual(result[0], b'HTTP/1.1 417 Expectation Failed\r\n') 1001 self.verify_expected_headers(result[1:-1]) 1002 # The expect handler should short circuit the usual get method by 1003 # returning false here, so get_called should be false 1004 self.assertFalse(self.handler.get_called) 1005 self.assertEqual(sum(r == b'Connection: close\r\n' for r in result[1:-1]), 1) 1006 self.handler = usual_handler # Restore to avoid breaking any subsequent tests. 1007 1008 def test_request_length(self): 1009 # Issue #10714: huge request lines are discarded, to avoid Denial 1010 # of Service attacks. 1011 result = self.send_typical_request(b'GET ' + b'x' * 65537) 1012 self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n') 1013 self.assertFalse(self.handler.get_called) 1014 self.assertIsInstance(self.handler.requestline, str) 1015 1016 def test_header_length(self): 1017 # Issue #6791: same for headers 1018 result = self.send_typical_request( 1019 b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n') 1020 self.assertEqual(result[0], b'HTTP/1.1 431 Line too long\r\n') 1021 self.assertFalse(self.handler.get_called) 1022 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 1023 1024 def test_too_many_headers(self): 1025 result = self.send_typical_request( 1026 b'GET / HTTP/1.1\r\n' + b'X-Foo: bar\r\n' * 101 + b'\r\n') 1027 self.assertEqual(result[0], b'HTTP/1.1 431 Too many headers\r\n') 1028 self.assertFalse(self.handler.get_called) 1029 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 1030 1031 def test_html_escape_on_error(self): 1032 result = self.send_typical_request( 1033 b'<script>alert("hello")</script> / HTTP/1.1') 1034 result = b''.join(result) 1035 text = '<script>alert("hello")</script>' 1036 self.assertIn(html.escape(text, quote=False).encode('ascii'), result) 1037 1038 def test_close_connection(self): 1039 # handle_one_request() should be repeatedly called until 1040 # it sets close_connection 1041 def handle_one_request(): 1042 self.handler.close_connection = next(close_values) 1043 self.handler.handle_one_request = handle_one_request 1044 1045 close_values = iter((True,)) 1046 self.handler.handle() 1047 self.assertRaises(StopIteration, next, close_values) 1048 1049 close_values = iter((False, False, True)) 1050 self.handler.handle() 1051 self.assertRaises(StopIteration, next, close_values) 1052 1053 def test_date_time_string(self): 1054 now = time.time() 1055 # this is the old code that formats the timestamp 1056 year, month, day, hh, mm, ss, wd, y, z = time.gmtime(now) 1057 expected = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % ( 1058 self.handler.weekdayname[wd], 1059 day, 1060 self.handler.monthname[month], 1061 year, hh, mm, ss 1062 ) 1063 self.assertEqual(self.handler.date_time_string(timestamp=now), expected) 1064 1065 1066class SimpleHTTPRequestHandlerTestCase(unittest.TestCase): 1067 """ Test url parsing """ 1068 def setUp(self): 1069 self.translated = os.getcwd() 1070 self.translated = os.path.join(self.translated, 'filename') 1071 self.handler = SocketlessRequestHandler() 1072 1073 def test_query_arguments(self): 1074 path = self.handler.translate_path('/filename') 1075 self.assertEqual(path, self.translated) 1076 path = self.handler.translate_path('/filename?foo=bar') 1077 self.assertEqual(path, self.translated) 1078 path = self.handler.translate_path('/filename?a=b&spam=eggs#zot') 1079 self.assertEqual(path, self.translated) 1080 1081 def test_start_with_double_slash(self): 1082 path = self.handler.translate_path('//filename') 1083 self.assertEqual(path, self.translated) 1084 path = self.handler.translate_path('//filename?foo=bar') 1085 self.assertEqual(path, self.translated) 1086 1087 def test_windows_colon(self): 1088 with support.swap_attr(server.os, 'path', ntpath): 1089 path = self.handler.translate_path('c:c:c:foo/filename') 1090 path = path.replace(ntpath.sep, os.sep) 1091 self.assertEqual(path, self.translated) 1092 1093 path = self.handler.translate_path('\\c:../filename') 1094 path = path.replace(ntpath.sep, os.sep) 1095 self.assertEqual(path, self.translated) 1096 1097 path = self.handler.translate_path('c:\\c:..\\foo/filename') 1098 path = path.replace(ntpath.sep, os.sep) 1099 self.assertEqual(path, self.translated) 1100 1101 path = self.handler.translate_path('c:c:foo\\c:c:bar/filename') 1102 path = path.replace(ntpath.sep, os.sep) 1103 self.assertEqual(path, self.translated) 1104 1105 1106class MiscTestCase(unittest.TestCase): 1107 def test_all(self): 1108 expected = [] 1109 blacklist = {'executable', 'nobody_uid', 'test'} 1110 for name in dir(server): 1111 if name.startswith('_') or name in blacklist: 1112 continue 1113 module_object = getattr(server, name) 1114 if getattr(module_object, '__module__', None) == 'http.server': 1115 expected.append(name) 1116 self.assertCountEqual(server.__all__, expected) 1117 1118 1119def test_main(verbose=None): 1120 cwd = os.getcwd() 1121 try: 1122 support.run_unittest( 1123 RequestHandlerLoggingTestCase, 1124 BaseHTTPRequestHandlerTestCase, 1125 BaseHTTPServerTestCase, 1126 SimpleHTTPServerTestCase, 1127 CGIHTTPServerTestCase, 1128 SimpleHTTPRequestHandlerTestCase, 1129 MiscTestCase, 1130 ) 1131 finally: 1132 os.chdir(cwd) 1133 1134if __name__ == '__main__': 1135 test_main() 1136