1"""Unittests for the various HTTPServer modules. 2 3Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>, 4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest. 5""" 6from collections import OrderedDict 7from http.server import BaseHTTPRequestHandler, HTTPServer, \ 8 SimpleHTTPRequestHandler, CGIHTTPRequestHandler 9from http import server, HTTPStatus 10 11import os 12import socket 13import sys 14import re 15import base64 16import ntpath 17import pathlib 18import shutil 19import email.message 20import email.utils 21import html 22import http, http.client 23import urllib.parse 24import tempfile 25import time 26import datetime 27import threading 28from unittest import mock 29from io import BytesIO 30 31import unittest 32from test import support 33from test.support import os_helper 34from test.support import threading_helper 35 36 37class NoLogRequestHandler: 38 def log_message(self, *args): 39 # don't write log messages to stderr 40 pass 41 42 def read(self, n=None): 43 return '' 44 45 46class TestServerThread(threading.Thread): 47 def __init__(self, test_object, request_handler): 48 threading.Thread.__init__(self) 49 self.request_handler = request_handler 50 self.test_object = test_object 51 52 def run(self): 53 self.server = HTTPServer(('localhost', 0), self.request_handler) 54 self.test_object.HOST, self.test_object.PORT = self.server.socket.getsockname() 55 self.test_object.server_started.set() 56 self.test_object = None 57 try: 58 self.server.serve_forever(0.05) 59 finally: 60 self.server.server_close() 61 62 def stop(self): 63 self.server.shutdown() 64 self.join() 65 66 67class BaseTestCase(unittest.TestCase): 68 def setUp(self): 69 self._threads = threading_helper.threading_setup() 70 os.environ = os_helper.EnvironmentVarGuard() 71 self.server_started = threading.Event() 72 self.thread = TestServerThread(self, self.request_handler) 73 self.thread.start() 74 self.server_started.wait() 75 76 def tearDown(self): 77 self.thread.stop() 78 self.thread = None 79 os.environ.__exit__() 80 threading_helper.threading_cleanup(*self._threads) 81 82 def request(self, uri, method='GET', body=None, headers={}): 83 self.connection = http.client.HTTPConnection(self.HOST, self.PORT) 84 self.connection.request(method, uri, body, headers) 85 return self.connection.getresponse() 86 87 88class BaseHTTPServerTestCase(BaseTestCase): 89 class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): 90 protocol_version = 'HTTP/1.1' 91 default_request_version = 'HTTP/1.1' 92 93 def do_TEST(self): 94 self.send_response(HTTPStatus.NO_CONTENT) 95 self.send_header('Content-Type', 'text/html') 96 self.send_header('Connection', 'close') 97 self.end_headers() 98 99 def do_KEEP(self): 100 self.send_response(HTTPStatus.NO_CONTENT) 101 self.send_header('Content-Type', 'text/html') 102 self.send_header('Connection', 'keep-alive') 103 self.end_headers() 104 105 def do_KEYERROR(self): 106 self.send_error(999) 107 108 def do_NOTFOUND(self): 109 self.send_error(HTTPStatus.NOT_FOUND) 110 111 def do_EXPLAINERROR(self): 112 self.send_error(999, "Short Message", 113 "This is a long \n explanation") 114 115 def do_CUSTOM(self): 116 self.send_response(999) 117 self.send_header('Content-Type', 'text/html') 118 self.send_header('Connection', 'close') 119 self.end_headers() 120 121 def do_LATINONEHEADER(self): 122 self.send_response(999) 123 self.send_header('X-Special', 'Dängerous Mind') 124 self.send_header('Connection', 'close') 125 self.end_headers() 126 body = self.headers['x-special-incoming'].encode('utf-8') 127 self.wfile.write(body) 128 129 def do_SEND_ERROR(self): 130 self.send_error(int(self.path[1:])) 131 132 def do_HEAD(self): 133 self.send_error(int(self.path[1:])) 134 135 def setUp(self): 136 BaseTestCase.setUp(self) 137 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 138 self.con.connect() 139 140 def test_command(self): 141 self.con.request('GET', '/') 142 res = self.con.getresponse() 143 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 144 145 def test_request_line_trimming(self): 146 self.con._http_vsn_str = 'HTTP/1.1\n' 147 self.con.putrequest('XYZBOGUS', '/') 148 self.con.endheaders() 149 res = self.con.getresponse() 150 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 151 152 def test_version_bogus(self): 153 self.con._http_vsn_str = 'FUBAR' 154 self.con.putrequest('GET', '/') 155 self.con.endheaders() 156 res = self.con.getresponse() 157 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 158 159 def test_version_digits(self): 160 self.con._http_vsn_str = 'HTTP/9.9.9' 161 self.con.putrequest('GET', '/') 162 self.con.endheaders() 163 res = self.con.getresponse() 164 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 165 166 def test_version_none_get(self): 167 self.con._http_vsn_str = '' 168 self.con.putrequest('GET', '/') 169 self.con.endheaders() 170 res = self.con.getresponse() 171 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 172 173 def test_version_none(self): 174 # Test that a valid method is rejected when not HTTP/1.x 175 self.con._http_vsn_str = '' 176 self.con.putrequest('CUSTOM', '/') 177 self.con.endheaders() 178 res = self.con.getresponse() 179 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 180 181 def test_version_invalid(self): 182 self.con._http_vsn = 99 183 self.con._http_vsn_str = 'HTTP/9.9' 184 self.con.putrequest('GET', '/') 185 self.con.endheaders() 186 res = self.con.getresponse() 187 self.assertEqual(res.status, HTTPStatus.HTTP_VERSION_NOT_SUPPORTED) 188 189 def test_send_blank(self): 190 self.con._http_vsn_str = '' 191 self.con.putrequest('', '') 192 self.con.endheaders() 193 res = self.con.getresponse() 194 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 195 196 def test_header_close(self): 197 self.con.putrequest('GET', '/') 198 self.con.putheader('Connection', 'close') 199 self.con.endheaders() 200 res = self.con.getresponse() 201 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 202 203 def test_header_keep_alive(self): 204 self.con._http_vsn_str = 'HTTP/1.1' 205 self.con.putrequest('GET', '/') 206 self.con.putheader('Connection', 'keep-alive') 207 self.con.endheaders() 208 res = self.con.getresponse() 209 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 210 211 def test_handler(self): 212 self.con.request('TEST', '/') 213 res = self.con.getresponse() 214 self.assertEqual(res.status, HTTPStatus.NO_CONTENT) 215 216 def test_return_header_keep_alive(self): 217 self.con.request('KEEP', '/') 218 res = self.con.getresponse() 219 self.assertEqual(res.getheader('Connection'), 'keep-alive') 220 self.con.request('TEST', '/') 221 self.addCleanup(self.con.close) 222 223 def test_internal_key_error(self): 224 self.con.request('KEYERROR', '/') 225 res = self.con.getresponse() 226 self.assertEqual(res.status, 999) 227 228 def test_return_custom_status(self): 229 self.con.request('CUSTOM', '/') 230 res = self.con.getresponse() 231 self.assertEqual(res.status, 999) 232 233 def test_return_explain_error(self): 234 self.con.request('EXPLAINERROR', '/') 235 res = self.con.getresponse() 236 self.assertEqual(res.status, 999) 237 self.assertTrue(int(res.getheader('Content-Length'))) 238 239 def test_latin1_header(self): 240 self.con.request('LATINONEHEADER', '/', headers={ 241 'X-Special-Incoming': 'Ärger mit Unicode' 242 }) 243 res = self.con.getresponse() 244 self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind') 245 self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8')) 246 247 def test_error_content_length(self): 248 # Issue #16088: standard error responses should have a content-length 249 self.con.request('NOTFOUND', '/') 250 res = self.con.getresponse() 251 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 252 253 data = res.read() 254 self.assertEqual(int(res.getheader('Content-Length')), len(data)) 255 256 def test_send_error(self): 257 allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED, 258 HTTPStatus.RESET_CONTENT) 259 for code in (HTTPStatus.NO_CONTENT, HTTPStatus.NOT_MODIFIED, 260 HTTPStatus.PROCESSING, HTTPStatus.RESET_CONTENT, 261 HTTPStatus.SWITCHING_PROTOCOLS): 262 self.con.request('SEND_ERROR', '/{}'.format(code)) 263 res = self.con.getresponse() 264 self.assertEqual(code, res.status) 265 self.assertEqual(None, res.getheader('Content-Length')) 266 self.assertEqual(None, res.getheader('Content-Type')) 267 if code not in allow_transfer_encoding_codes: 268 self.assertEqual(None, res.getheader('Transfer-Encoding')) 269 270 data = res.read() 271 self.assertEqual(b'', data) 272 273 def test_head_via_send_error(self): 274 allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED, 275 HTTPStatus.RESET_CONTENT) 276 for code in (HTTPStatus.OK, HTTPStatus.NO_CONTENT, 277 HTTPStatus.NOT_MODIFIED, HTTPStatus.RESET_CONTENT, 278 HTTPStatus.SWITCHING_PROTOCOLS): 279 self.con.request('HEAD', '/{}'.format(code)) 280 res = self.con.getresponse() 281 self.assertEqual(code, res.status) 282 if code == HTTPStatus.OK: 283 self.assertTrue(int(res.getheader('Content-Length')) > 0) 284 self.assertIn('text/html', res.getheader('Content-Type')) 285 else: 286 self.assertEqual(None, res.getheader('Content-Length')) 287 self.assertEqual(None, res.getheader('Content-Type')) 288 if code not in allow_transfer_encoding_codes: 289 self.assertEqual(None, res.getheader('Transfer-Encoding')) 290 291 data = res.read() 292 self.assertEqual(b'', data) 293 294 295class RequestHandlerLoggingTestCase(BaseTestCase): 296 class request_handler(BaseHTTPRequestHandler): 297 protocol_version = 'HTTP/1.1' 298 default_request_version = 'HTTP/1.1' 299 300 def do_GET(self): 301 self.send_response(HTTPStatus.OK) 302 self.end_headers() 303 304 def do_ERROR(self): 305 self.send_error(HTTPStatus.NOT_FOUND, 'File not found') 306 307 def test_get(self): 308 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 309 self.con.connect() 310 311 with support.captured_stderr() as err: 312 self.con.request('GET', '/') 313 self.con.getresponse() 314 315 self.assertTrue( 316 err.getvalue().endswith('"GET / HTTP/1.1" 200 -\n')) 317 318 def test_err(self): 319 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 320 self.con.connect() 321 322 with support.captured_stderr() as err: 323 self.con.request('ERROR', '/') 324 self.con.getresponse() 325 326 lines = err.getvalue().split('\n') 327 self.assertTrue(lines[0].endswith('code 404, message File not found')) 328 self.assertTrue(lines[1].endswith('"ERROR / HTTP/1.1" 404 -')) 329 330 331class SimpleHTTPServerTestCase(BaseTestCase): 332 class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler): 333 pass 334 335 def setUp(self): 336 super().setUp() 337 self.cwd = os.getcwd() 338 basetempdir = tempfile.gettempdir() 339 os.chdir(basetempdir) 340 self.data = b'We are the knights who say Ni!' 341 self.tempdir = tempfile.mkdtemp(dir=basetempdir) 342 self.tempdir_name = os.path.basename(self.tempdir) 343 self.base_url = '/' + self.tempdir_name 344 tempname = os.path.join(self.tempdir, 'test') 345 with open(tempname, 'wb') as temp: 346 temp.write(self.data) 347 temp.flush() 348 mtime = os.stat(tempname).st_mtime 349 # compute last modification datetime for browser cache tests 350 last_modif = datetime.datetime.fromtimestamp(mtime, 351 datetime.timezone.utc) 352 self.last_modif_datetime = last_modif.replace(microsecond=0) 353 self.last_modif_header = email.utils.formatdate( 354 last_modif.timestamp(), usegmt=True) 355 356 def tearDown(self): 357 try: 358 os.chdir(self.cwd) 359 try: 360 shutil.rmtree(self.tempdir) 361 except: 362 pass 363 finally: 364 super().tearDown() 365 366 def check_status_and_reason(self, response, status, data=None): 367 def close_conn(): 368 """Don't close reader yet so we can check if there was leftover 369 buffered input""" 370 nonlocal reader 371 reader = response.fp 372 response.fp = None 373 reader = None 374 response._close_conn = close_conn 375 376 body = response.read() 377 self.assertTrue(response) 378 self.assertEqual(response.status, status) 379 self.assertIsNotNone(response.reason) 380 if data: 381 self.assertEqual(data, body) 382 # Ensure the server has not set up a persistent connection, and has 383 # not sent any extra data 384 self.assertEqual(response.version, 10) 385 self.assertEqual(response.msg.get("Connection", "close"), "close") 386 self.assertEqual(reader.read(30), b'', 'Connection should be closed') 387 388 reader.close() 389 return body 390 391 @unittest.skipIf(sys.platform == 'darwin', 392 'undecodable name cannot always be decoded on macOS') 393 @unittest.skipIf(sys.platform == 'win32', 394 'undecodable name cannot be decoded on win32') 395 @unittest.skipUnless(os_helper.TESTFN_UNDECODABLE, 396 'need os_helper.TESTFN_UNDECODABLE') 397 def test_undecodable_filename(self): 398 enc = sys.getfilesystemencoding() 399 filename = os.fsdecode(os_helper.TESTFN_UNDECODABLE) + '.txt' 400 with open(os.path.join(self.tempdir, filename), 'wb') as f: 401 f.write(os_helper.TESTFN_UNDECODABLE) 402 response = self.request(self.base_url + '/') 403 if sys.platform == 'darwin': 404 # On Mac OS the HFS+ filesystem replaces bytes that aren't valid 405 # UTF-8 into a percent-encoded value. 406 for name in os.listdir(self.tempdir): 407 if name != 'test': # Ignore a filename created in setUp(). 408 filename = name 409 break 410 body = self.check_status_and_reason(response, HTTPStatus.OK) 411 quotedname = urllib.parse.quote(filename, errors='surrogatepass') 412 self.assertIn(('href="%s"' % quotedname) 413 .encode(enc, 'surrogateescape'), body) 414 self.assertIn(('>%s<' % html.escape(filename, quote=False)) 415 .encode(enc, 'surrogateescape'), body) 416 response = self.request(self.base_url + '/' + quotedname) 417 self.check_status_and_reason(response, HTTPStatus.OK, 418 data=os_helper.TESTFN_UNDECODABLE) 419 420 def test_get_dir_redirect_location_domain_injection_bug(self): 421 """Ensure //evil.co/..%2f../../X does not put //evil.co/ in Location. 422 423 //netloc/ in a Location header is a redirect to a new host. 424 https://github.com/python/cpython/issues/87389 425 426 This checks that a path resolving to a directory on our server cannot 427 resolve into a redirect to another server. 428 """ 429 os.mkdir(os.path.join(self.tempdir, 'existing_directory')) 430 url = f'/python.org/..%2f..%2f..%2f..%2f..%2f../%0a%0d/../{self.tempdir_name}/existing_directory' 431 expected_location = f'{url}/' # /python.org.../ single slash single prefix, trailing slash 432 # Canonicalizes to /tmp/tempdir_name/existing_directory which does 433 # exist and is a dir, triggering the 301 redirect logic. 434 response = self.request(url) 435 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 436 location = response.getheader('Location') 437 self.assertEqual(location, expected_location, msg='non-attack failed!') 438 439 # //python.org... multi-slash prefix, no trailing slash 440 attack_url = f'/{url}' 441 response = self.request(attack_url) 442 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 443 location = response.getheader('Location') 444 self.assertFalse(location.startswith('//'), msg=location) 445 self.assertEqual(location, expected_location, 446 msg='Expected Location header to start with a single / and ' 447 'end with a / as this is a directory redirect.') 448 449 # ///python.org... triple-slash prefix, no trailing slash 450 attack3_url = f'//{url}' 451 response = self.request(attack3_url) 452 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 453 self.assertEqual(response.getheader('Location'), expected_location) 454 455 # If the second word in the http request (Request-URI for the http 456 # method) is a full URI, we don't worry about it, as that'll be parsed 457 # and reassembled as a full URI within BaseHTTPRequestHandler.send_head 458 # so no errant scheme-less //netloc//evil.co/ domain mixup can happen. 459 attack_scheme_netloc_2slash_url = f'https://pypi.org/{url}' 460 expected_scheme_netloc_location = f'{attack_scheme_netloc_2slash_url}/' 461 response = self.request(attack_scheme_netloc_2slash_url) 462 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 463 location = response.getheader('Location') 464 # We're just ensuring that the scheme and domain make it through, if 465 # there are or aren't multiple slashes at the start of the path that 466 # follows that isn't important in this Location: header. 467 self.assertTrue(location.startswith('https://pypi.org/'), msg=location) 468 469 def test_get(self): 470 #constructs the path relative to the root directory of the HTTPServer 471 response = self.request(self.base_url + '/test') 472 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) 473 # check for trailing "/" which should return 404. See Issue17324 474 response = self.request(self.base_url + '/test/') 475 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 476 response = self.request(self.base_url + '/') 477 self.check_status_and_reason(response, HTTPStatus.OK) 478 response = self.request(self.base_url) 479 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 480 self.assertEqual(response.getheader("Content-Length"), "0") 481 response = self.request(self.base_url + '/?hi=2') 482 self.check_status_and_reason(response, HTTPStatus.OK) 483 response = self.request(self.base_url + '?hi=1') 484 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 485 self.assertEqual(response.getheader("Location"), 486 self.base_url + "/?hi=1") 487 response = self.request('/ThisDoesNotExist') 488 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 489 response = self.request('/' + 'ThisDoesNotExist' + '/') 490 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 491 492 data = b"Dummy index file\r\n" 493 with open(os.path.join(self.tempdir_name, 'index.html'), 'wb') as f: 494 f.write(data) 495 response = self.request(self.base_url + '/') 496 self.check_status_and_reason(response, HTTPStatus.OK, data) 497 498 # chmod() doesn't work as expected on Windows, and filesystem 499 # permissions are ignored by root on Unix. 500 if os.name == 'posix' and os.geteuid() != 0: 501 os.chmod(self.tempdir, 0) 502 try: 503 response = self.request(self.base_url + '/') 504 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 505 finally: 506 os.chmod(self.tempdir, 0o755) 507 508 def test_head(self): 509 response = self.request( 510 self.base_url + '/test', method='HEAD') 511 self.check_status_and_reason(response, HTTPStatus.OK) 512 self.assertEqual(response.getheader('content-length'), 513 str(len(self.data))) 514 self.assertEqual(response.getheader('content-type'), 515 'application/octet-stream') 516 517 def test_browser_cache(self): 518 """Check that when a request to /test is sent with the request header 519 If-Modified-Since set to date of last modification, the server returns 520 status code 304, not 200 521 """ 522 headers = email.message.Message() 523 headers['If-Modified-Since'] = self.last_modif_header 524 response = self.request(self.base_url + '/test', headers=headers) 525 self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED) 526 527 # one hour after last modification : must return 304 528 new_dt = self.last_modif_datetime + datetime.timedelta(hours=1) 529 headers = email.message.Message() 530 headers['If-Modified-Since'] = email.utils.format_datetime(new_dt, 531 usegmt=True) 532 response = self.request(self.base_url + '/test', headers=headers) 533 self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED) 534 535 def test_browser_cache_file_changed(self): 536 # with If-Modified-Since earlier than Last-Modified, must return 200 537 dt = self.last_modif_datetime 538 # build datetime object : 365 days before last modification 539 old_dt = dt - datetime.timedelta(days=365) 540 headers = email.message.Message() 541 headers['If-Modified-Since'] = email.utils.format_datetime(old_dt, 542 usegmt=True) 543 response = self.request(self.base_url + '/test', headers=headers) 544 self.check_status_and_reason(response, HTTPStatus.OK) 545 546 def test_browser_cache_with_If_None_Match_header(self): 547 # if If-None-Match header is present, ignore If-Modified-Since 548 549 headers = email.message.Message() 550 headers['If-Modified-Since'] = self.last_modif_header 551 headers['If-None-Match'] = "*" 552 response = self.request(self.base_url + '/test', headers=headers) 553 self.check_status_and_reason(response, HTTPStatus.OK) 554 555 def test_invalid_requests(self): 556 response = self.request('/', method='FOO') 557 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 558 # requests must be case sensitive,so this should fail too 559 response = self.request('/', method='custom') 560 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 561 response = self.request('/', method='GETs') 562 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 563 564 def test_last_modified(self): 565 """Checks that the datetime returned in Last-Modified response header 566 is the actual datetime of last modification, rounded to the second 567 """ 568 response = self.request(self.base_url + '/test') 569 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) 570 last_modif_header = response.headers['Last-modified'] 571 self.assertEqual(last_modif_header, self.last_modif_header) 572 573 def test_path_without_leading_slash(self): 574 response = self.request(self.tempdir_name + '/test') 575 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) 576 response = self.request(self.tempdir_name + '/test/') 577 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 578 response = self.request(self.tempdir_name + '/') 579 self.check_status_and_reason(response, HTTPStatus.OK) 580 response = self.request(self.tempdir_name) 581 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 582 response = self.request(self.tempdir_name + '/?hi=2') 583 self.check_status_and_reason(response, HTTPStatus.OK) 584 response = self.request(self.tempdir_name + '?hi=1') 585 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 586 self.assertEqual(response.getheader("Location"), 587 self.tempdir_name + "/?hi=1") 588 589 def test_html_escape_filename(self): 590 filename = '<test&>.txt' 591 fullpath = os.path.join(self.tempdir, filename) 592 593 try: 594 open(fullpath, 'wb').close() 595 except OSError: 596 raise unittest.SkipTest('Can not create file %s on current file ' 597 'system' % filename) 598 599 try: 600 response = self.request(self.base_url + '/') 601 body = self.check_status_and_reason(response, HTTPStatus.OK) 602 enc = response.headers.get_content_charset() 603 finally: 604 os.unlink(fullpath) # avoid affecting test_undecodable_filename 605 606 self.assertIsNotNone(enc) 607 html_text = '>%s<' % html.escape(filename, quote=False) 608 self.assertIn(html_text.encode(enc), body) 609 610 611cgi_file1 = """\ 612#!%s 613 614print("Content-type: text/html") 615print() 616print("Hello World") 617""" 618 619cgi_file2 = """\ 620#!%s 621import cgi 622 623print("Content-type: text/html") 624print() 625 626form = cgi.FieldStorage() 627print("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"), 628 form.getfirst("bacon"))) 629""" 630 631cgi_file4 = """\ 632#!%s 633import os 634 635print("Content-type: text/html") 636print() 637 638print(os.environ["%s"]) 639""" 640 641cgi_file6 = """\ 642#!%s 643import os 644 645print("X-ambv: was here") 646print("Content-type: text/html") 647print() 648print("<pre>") 649for k, v in os.environ.items(): 650 try: 651 k.encode('ascii') 652 v.encode('ascii') 653 except UnicodeEncodeError: 654 continue # see: BPO-44647 655 print(f"{k}={v}") 656print("</pre>") 657""" 658 659 660@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, 661 "This test can't be run reliably as root (issue #13308).") 662class CGIHTTPServerTestCase(BaseTestCase): 663 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler): 664 pass 665 666 linesep = os.linesep.encode('ascii') 667 668 def setUp(self): 669 BaseTestCase.setUp(self) 670 self.cwd = os.getcwd() 671 self.parent_dir = tempfile.mkdtemp() 672 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin') 673 self.cgi_child_dir = os.path.join(self.cgi_dir, 'child-dir') 674 self.sub_dir_1 = os.path.join(self.parent_dir, 'sub') 675 self.sub_dir_2 = os.path.join(self.sub_dir_1, 'dir') 676 self.cgi_dir_in_sub_dir = os.path.join(self.sub_dir_2, 'cgi-bin') 677 os.mkdir(self.cgi_dir) 678 os.mkdir(self.cgi_child_dir) 679 os.mkdir(self.sub_dir_1) 680 os.mkdir(self.sub_dir_2) 681 os.mkdir(self.cgi_dir_in_sub_dir) 682 self.nocgi_path = None 683 self.file1_path = None 684 self.file2_path = None 685 self.file3_path = None 686 self.file4_path = None 687 self.file5_path = None 688 689 # The shebang line should be pure ASCII: use symlink if possible. 690 # See issue #7668. 691 self._pythonexe_symlink = None 692 if os_helper.can_symlink(): 693 self.pythonexe = os.path.join(self.parent_dir, 'python') 694 self._pythonexe_symlink = support.PythonSymlink(self.pythonexe).__enter__() 695 else: 696 self.pythonexe = sys.executable 697 698 try: 699 # The python executable path is written as the first line of the 700 # CGI Python script. The encoding cookie cannot be used, and so the 701 # path should be encodable to the default script encoding (utf-8) 702 self.pythonexe.encode('utf-8') 703 except UnicodeEncodeError: 704 self.tearDown() 705 self.skipTest("Python executable path is not encodable to utf-8") 706 707 self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py') 708 with open(self.nocgi_path, 'w', encoding='utf-8') as fp: 709 fp.write(cgi_file1 % self.pythonexe) 710 os.chmod(self.nocgi_path, 0o777) 711 712 self.file1_path = os.path.join(self.cgi_dir, 'file1.py') 713 with open(self.file1_path, 'w', encoding='utf-8') as file1: 714 file1.write(cgi_file1 % self.pythonexe) 715 os.chmod(self.file1_path, 0o777) 716 717 self.file2_path = os.path.join(self.cgi_dir, 'file2.py') 718 with open(self.file2_path, 'w', encoding='utf-8') as file2: 719 file2.write(cgi_file2 % self.pythonexe) 720 os.chmod(self.file2_path, 0o777) 721 722 self.file3_path = os.path.join(self.cgi_child_dir, 'file3.py') 723 with open(self.file3_path, 'w', encoding='utf-8') as file3: 724 file3.write(cgi_file1 % self.pythonexe) 725 os.chmod(self.file3_path, 0o777) 726 727 self.file4_path = os.path.join(self.cgi_dir, 'file4.py') 728 with open(self.file4_path, 'w', encoding='utf-8') as file4: 729 file4.write(cgi_file4 % (self.pythonexe, 'QUERY_STRING')) 730 os.chmod(self.file4_path, 0o777) 731 732 self.file5_path = os.path.join(self.cgi_dir_in_sub_dir, 'file5.py') 733 with open(self.file5_path, 'w', encoding='utf-8') as file5: 734 file5.write(cgi_file1 % self.pythonexe) 735 os.chmod(self.file5_path, 0o777) 736 737 self.file6_path = os.path.join(self.cgi_dir, 'file6.py') 738 with open(self.file6_path, 'w', encoding='utf-8') as file6: 739 file6.write(cgi_file6 % self.pythonexe) 740 os.chmod(self.file6_path, 0o777) 741 742 os.chdir(self.parent_dir) 743 744 def tearDown(self): 745 try: 746 os.chdir(self.cwd) 747 if self._pythonexe_symlink: 748 self._pythonexe_symlink.__exit__(None, None, None) 749 if self.nocgi_path: 750 os.remove(self.nocgi_path) 751 if self.file1_path: 752 os.remove(self.file1_path) 753 if self.file2_path: 754 os.remove(self.file2_path) 755 if self.file3_path: 756 os.remove(self.file3_path) 757 if self.file4_path: 758 os.remove(self.file4_path) 759 if self.file5_path: 760 os.remove(self.file5_path) 761 if self.file6_path: 762 os.remove(self.file6_path) 763 os.rmdir(self.cgi_child_dir) 764 os.rmdir(self.cgi_dir) 765 os.rmdir(self.cgi_dir_in_sub_dir) 766 os.rmdir(self.sub_dir_2) 767 os.rmdir(self.sub_dir_1) 768 os.rmdir(self.parent_dir) 769 finally: 770 BaseTestCase.tearDown(self) 771 772 def test_url_collapse_path(self): 773 # verify tail is the last portion and head is the rest on proper urls 774 test_vectors = { 775 '': '//', 776 '..': IndexError, 777 '/.//..': IndexError, 778 '/': '//', 779 '//': '//', 780 '/\\': '//\\', 781 '/.//': '//', 782 'cgi-bin/file1.py': '/cgi-bin/file1.py', 783 '/cgi-bin/file1.py': '/cgi-bin/file1.py', 784 'a': '//a', 785 '/a': '//a', 786 '//a': '//a', 787 './a': '//a', 788 './C:/': '/C:/', 789 '/a/b': '/a/b', 790 '/a/b/': '/a/b/', 791 '/a/b/.': '/a/b/', 792 '/a/b/c/..': '/a/b/', 793 '/a/b/c/../d': '/a/b/d', 794 '/a/b/c/../d/e/../f': '/a/b/d/f', 795 '/a/b/c/../d/e/../../f': '/a/b/f', 796 '/a/b/c/../d/e/.././././..//f': '/a/b/f', 797 '../a/b/c/../d/e/.././././..//f': IndexError, 798 '/a/b/c/../d/e/../../../f': '/a/f', 799 '/a/b/c/../d/e/../../../../f': '//f', 800 '/a/b/c/../d/e/../../../../../f': IndexError, 801 '/a/b/c/../d/e/../../../../f/..': '//', 802 '/a/b/c/../d/e/../../../../f/../.': '//', 803 } 804 for path, expected in test_vectors.items(): 805 if isinstance(expected, type) and issubclass(expected, Exception): 806 self.assertRaises(expected, 807 server._url_collapse_path, path) 808 else: 809 actual = server._url_collapse_path(path) 810 self.assertEqual(expected, actual, 811 msg='path = %r\nGot: %r\nWanted: %r' % 812 (path, actual, expected)) 813 814 def test_headers_and_content(self): 815 res = self.request('/cgi-bin/file1.py') 816 self.assertEqual( 817 (res.read(), res.getheader('Content-type'), res.status), 818 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK)) 819 820 def test_issue19435(self): 821 res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh') 822 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 823 824 def test_post(self): 825 params = urllib.parse.urlencode( 826 {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456}) 827 headers = {'Content-type' : 'application/x-www-form-urlencoded'} 828 res = self.request('/cgi-bin/file2.py', 'POST', params, headers) 829 830 self.assertEqual(res.read(), b'1, python, 123456' + self.linesep) 831 832 def test_invaliduri(self): 833 res = self.request('/cgi-bin/invalid') 834 res.read() 835 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 836 837 def test_authorization(self): 838 headers = {b'Authorization' : b'Basic ' + 839 base64.b64encode(b'username:pass')} 840 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers) 841 self.assertEqual( 842 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 843 (res.read(), res.getheader('Content-type'), res.status)) 844 845 def test_no_leading_slash(self): 846 # http://bugs.python.org/issue2254 847 res = self.request('cgi-bin/file1.py') 848 self.assertEqual( 849 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 850 (res.read(), res.getheader('Content-type'), res.status)) 851 852 def test_os_environ_is_not_altered(self): 853 signature = "Test CGI Server" 854 os.environ['SERVER_SOFTWARE'] = signature 855 res = self.request('/cgi-bin/file1.py') 856 self.assertEqual( 857 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 858 (res.read(), res.getheader('Content-type'), res.status)) 859 self.assertEqual(os.environ['SERVER_SOFTWARE'], signature) 860 861 def test_urlquote_decoding_in_cgi_check(self): 862 res = self.request('/cgi-bin%2ffile1.py') 863 self.assertEqual( 864 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 865 (res.read(), res.getheader('Content-type'), res.status)) 866 867 def test_nested_cgi_path_issue21323(self): 868 res = self.request('/cgi-bin/child-dir/file3.py') 869 self.assertEqual( 870 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 871 (res.read(), res.getheader('Content-type'), res.status)) 872 873 def test_query_with_multiple_question_mark(self): 874 res = self.request('/cgi-bin/file4.py?a=b?c=d') 875 self.assertEqual( 876 (b'a=b?c=d' + self.linesep, 'text/html', HTTPStatus.OK), 877 (res.read(), res.getheader('Content-type'), res.status)) 878 879 def test_query_with_continuous_slashes(self): 880 res = self.request('/cgi-bin/file4.py?k=aa%2F%2Fbb&//q//p//=//a//b//') 881 self.assertEqual( 882 (b'k=aa%2F%2Fbb&//q//p//=//a//b//' + self.linesep, 883 'text/html', HTTPStatus.OK), 884 (res.read(), res.getheader('Content-type'), res.status)) 885 886 def test_cgi_path_in_sub_directories(self): 887 try: 888 CGIHTTPRequestHandler.cgi_directories.append('/sub/dir/cgi-bin') 889 res = self.request('/sub/dir/cgi-bin/file5.py') 890 self.assertEqual( 891 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 892 (res.read(), res.getheader('Content-type'), res.status)) 893 finally: 894 CGIHTTPRequestHandler.cgi_directories.remove('/sub/dir/cgi-bin') 895 896 def test_accept(self): 897 browser_accept = \ 898 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' 899 tests = ( 900 ((('Accept', browser_accept),), browser_accept), 901 ((), ''), 902 # Hack case to get two values for the one header 903 ((('Accept', 'text/html'), ('ACCEPT', 'text/plain')), 904 'text/html,text/plain'), 905 ) 906 for headers, expected in tests: 907 headers = OrderedDict(headers) 908 with self.subTest(headers): 909 res = self.request('/cgi-bin/file6.py', 'GET', headers=headers) 910 self.assertEqual(http.HTTPStatus.OK, res.status) 911 expected = f"HTTP_ACCEPT={expected}".encode('ascii') 912 self.assertIn(expected, res.read()) 913 914 915class SocketlessRequestHandler(SimpleHTTPRequestHandler): 916 def __init__(self, directory=None): 917 request = mock.Mock() 918 request.makefile.return_value = BytesIO() 919 super().__init__(request, None, None, directory=directory) 920 921 self.get_called = False 922 self.protocol_version = "HTTP/1.1" 923 924 def do_GET(self): 925 self.get_called = True 926 self.send_response(HTTPStatus.OK) 927 self.send_header('Content-Type', 'text/html') 928 self.end_headers() 929 self.wfile.write(b'<html><body>Data</body></html>\r\n') 930 931 def log_message(self, format, *args): 932 pass 933 934class RejectingSocketlessRequestHandler(SocketlessRequestHandler): 935 def handle_expect_100(self): 936 self.send_error(HTTPStatus.EXPECTATION_FAILED) 937 return False 938 939 940class AuditableBytesIO: 941 942 def __init__(self): 943 self.datas = [] 944 945 def write(self, data): 946 self.datas.append(data) 947 948 def getData(self): 949 return b''.join(self.datas) 950 951 @property 952 def numWrites(self): 953 return len(self.datas) 954 955 956class BaseHTTPRequestHandlerTestCase(unittest.TestCase): 957 """Test the functionality of the BaseHTTPServer. 958 959 Test the support for the Expect 100-continue header. 960 """ 961 962 HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK') 963 964 def setUp (self): 965 self.handler = SocketlessRequestHandler() 966 967 def send_typical_request(self, message): 968 input = BytesIO(message) 969 output = BytesIO() 970 self.handler.rfile = input 971 self.handler.wfile = output 972 self.handler.handle_one_request() 973 output.seek(0) 974 return output.readlines() 975 976 def verify_get_called(self): 977 self.assertTrue(self.handler.get_called) 978 979 def verify_expected_headers(self, headers): 980 for fieldName in b'Server: ', b'Date: ', b'Content-Type: ': 981 self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1) 982 983 def verify_http_server_response(self, response): 984 match = self.HTTPResponseMatch.search(response) 985 self.assertIsNotNone(match) 986 987 def test_http_1_1(self): 988 result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n') 989 self.verify_http_server_response(result[0]) 990 self.verify_expected_headers(result[1:-1]) 991 self.verify_get_called() 992 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 993 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 994 self.assertEqual(self.handler.command, 'GET') 995 self.assertEqual(self.handler.path, '/') 996 self.assertEqual(self.handler.request_version, 'HTTP/1.1') 997 self.assertSequenceEqual(self.handler.headers.items(), ()) 998 999 def test_http_1_0(self): 1000 result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n') 1001 self.verify_http_server_response(result[0]) 1002 self.verify_expected_headers(result[1:-1]) 1003 self.verify_get_called() 1004 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 1005 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0') 1006 self.assertEqual(self.handler.command, 'GET') 1007 self.assertEqual(self.handler.path, '/') 1008 self.assertEqual(self.handler.request_version, 'HTTP/1.0') 1009 self.assertSequenceEqual(self.handler.headers.items(), ()) 1010 1011 def test_http_0_9(self): 1012 result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n') 1013 self.assertEqual(len(result), 1) 1014 self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n') 1015 self.verify_get_called() 1016 1017 def test_extra_space(self): 1018 result = self.send_typical_request( 1019 b'GET /spaced out HTTP/1.1\r\n' 1020 b'Host: dummy\r\n' 1021 b'\r\n' 1022 ) 1023 self.assertTrue(result[0].startswith(b'HTTP/1.1 400 ')) 1024 self.verify_expected_headers(result[1:result.index(b'\r\n')]) 1025 self.assertFalse(self.handler.get_called) 1026 1027 def test_with_continue_1_0(self): 1028 result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n') 1029 self.verify_http_server_response(result[0]) 1030 self.verify_expected_headers(result[1:-1]) 1031 self.verify_get_called() 1032 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 1033 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0') 1034 self.assertEqual(self.handler.command, 'GET') 1035 self.assertEqual(self.handler.path, '/') 1036 self.assertEqual(self.handler.request_version, 'HTTP/1.0') 1037 headers = (("Expect", "100-continue"),) 1038 self.assertSequenceEqual(self.handler.headers.items(), headers) 1039 1040 def test_with_continue_1_1(self): 1041 result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 1042 self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n') 1043 self.assertEqual(result[1], b'\r\n') 1044 self.assertEqual(result[2], b'HTTP/1.1 200 OK\r\n') 1045 self.verify_expected_headers(result[2:-1]) 1046 self.verify_get_called() 1047 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 1048 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 1049 self.assertEqual(self.handler.command, 'GET') 1050 self.assertEqual(self.handler.path, '/') 1051 self.assertEqual(self.handler.request_version, 'HTTP/1.1') 1052 headers = (("Expect", "100-continue"),) 1053 self.assertSequenceEqual(self.handler.headers.items(), headers) 1054 1055 def test_header_buffering_of_send_error(self): 1056 1057 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 1058 output = AuditableBytesIO() 1059 handler = SocketlessRequestHandler() 1060 handler.rfile = input 1061 handler.wfile = output 1062 handler.request_version = 'HTTP/1.1' 1063 handler.requestline = '' 1064 handler.command = None 1065 1066 handler.send_error(418) 1067 self.assertEqual(output.numWrites, 2) 1068 1069 def test_header_buffering_of_send_response_only(self): 1070 1071 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 1072 output = AuditableBytesIO() 1073 handler = SocketlessRequestHandler() 1074 handler.rfile = input 1075 handler.wfile = output 1076 handler.request_version = 'HTTP/1.1' 1077 1078 handler.send_response_only(418) 1079 self.assertEqual(output.numWrites, 0) 1080 handler.end_headers() 1081 self.assertEqual(output.numWrites, 1) 1082 1083 def test_header_buffering_of_send_header(self): 1084 1085 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 1086 output = AuditableBytesIO() 1087 handler = SocketlessRequestHandler() 1088 handler.rfile = input 1089 handler.wfile = output 1090 handler.request_version = 'HTTP/1.1' 1091 1092 handler.send_header('Foo', 'foo') 1093 handler.send_header('bar', 'bar') 1094 self.assertEqual(output.numWrites, 0) 1095 handler.end_headers() 1096 self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n') 1097 self.assertEqual(output.numWrites, 1) 1098 1099 def test_header_unbuffered_when_continue(self): 1100 1101 def _readAndReseek(f): 1102 pos = f.tell() 1103 f.seek(0) 1104 data = f.read() 1105 f.seek(pos) 1106 return data 1107 1108 input = BytesIO(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 1109 output = BytesIO() 1110 self.handler.rfile = input 1111 self.handler.wfile = output 1112 self.handler.request_version = 'HTTP/1.1' 1113 1114 self.handler.handle_one_request() 1115 self.assertNotEqual(_readAndReseek(output), b'') 1116 result = _readAndReseek(output).split(b'\r\n') 1117 self.assertEqual(result[0], b'HTTP/1.1 100 Continue') 1118 self.assertEqual(result[1], b'') 1119 self.assertEqual(result[2], b'HTTP/1.1 200 OK') 1120 1121 def test_with_continue_rejected(self): 1122 usual_handler = self.handler # Save to avoid breaking any subsequent tests. 1123 self.handler = RejectingSocketlessRequestHandler() 1124 result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 1125 self.assertEqual(result[0], b'HTTP/1.1 417 Expectation Failed\r\n') 1126 self.verify_expected_headers(result[1:-1]) 1127 # The expect handler should short circuit the usual get method by 1128 # returning false here, so get_called should be false 1129 self.assertFalse(self.handler.get_called) 1130 self.assertEqual(sum(r == b'Connection: close\r\n' for r in result[1:-1]), 1) 1131 self.handler = usual_handler # Restore to avoid breaking any subsequent tests. 1132 1133 def test_request_length(self): 1134 # Issue #10714: huge request lines are discarded, to avoid Denial 1135 # of Service attacks. 1136 result = self.send_typical_request(b'GET ' + b'x' * 65537) 1137 self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n') 1138 self.assertFalse(self.handler.get_called) 1139 self.assertIsInstance(self.handler.requestline, str) 1140 1141 def test_header_length(self): 1142 # Issue #6791: same for headers 1143 result = self.send_typical_request( 1144 b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n') 1145 self.assertEqual(result[0], b'HTTP/1.1 431 Line too long\r\n') 1146 self.assertFalse(self.handler.get_called) 1147 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 1148 1149 def test_too_many_headers(self): 1150 result = self.send_typical_request( 1151 b'GET / HTTP/1.1\r\n' + b'X-Foo: bar\r\n' * 101 + b'\r\n') 1152 self.assertEqual(result[0], b'HTTP/1.1 431 Too many headers\r\n') 1153 self.assertFalse(self.handler.get_called) 1154 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 1155 1156 def test_html_escape_on_error(self): 1157 result = self.send_typical_request( 1158 b'<script>alert("hello")</script> / HTTP/1.1') 1159 result = b''.join(result) 1160 text = '<script>alert("hello")</script>' 1161 self.assertIn(html.escape(text, quote=False).encode('ascii'), result) 1162 1163 def test_close_connection(self): 1164 # handle_one_request() should be repeatedly called until 1165 # it sets close_connection 1166 def handle_one_request(): 1167 self.handler.close_connection = next(close_values) 1168 self.handler.handle_one_request = handle_one_request 1169 1170 close_values = iter((True,)) 1171 self.handler.handle() 1172 self.assertRaises(StopIteration, next, close_values) 1173 1174 close_values = iter((False, False, True)) 1175 self.handler.handle() 1176 self.assertRaises(StopIteration, next, close_values) 1177 1178 def test_date_time_string(self): 1179 now = time.time() 1180 # this is the old code that formats the timestamp 1181 year, month, day, hh, mm, ss, wd, y, z = time.gmtime(now) 1182 expected = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % ( 1183 self.handler.weekdayname[wd], 1184 day, 1185 self.handler.monthname[month], 1186 year, hh, mm, ss 1187 ) 1188 self.assertEqual(self.handler.date_time_string(timestamp=now), expected) 1189 1190 1191class SimpleHTTPRequestHandlerTestCase(unittest.TestCase): 1192 """ Test url parsing """ 1193 def setUp(self): 1194 self.translated_1 = os.path.join(os.getcwd(), 'filename') 1195 self.translated_2 = os.path.join('foo', 'filename') 1196 self.translated_3 = os.path.join('bar', 'filename') 1197 self.handler_1 = SocketlessRequestHandler() 1198 self.handler_2 = SocketlessRequestHandler(directory='foo') 1199 self.handler_3 = SocketlessRequestHandler(directory=pathlib.PurePath('bar')) 1200 1201 def test_query_arguments(self): 1202 path = self.handler_1.translate_path('/filename') 1203 self.assertEqual(path, self.translated_1) 1204 path = self.handler_2.translate_path('/filename') 1205 self.assertEqual(path, self.translated_2) 1206 path = self.handler_3.translate_path('/filename') 1207 self.assertEqual(path, self.translated_3) 1208 1209 path = self.handler_1.translate_path('/filename?foo=bar') 1210 self.assertEqual(path, self.translated_1) 1211 path = self.handler_2.translate_path('/filename?foo=bar') 1212 self.assertEqual(path, self.translated_2) 1213 path = self.handler_3.translate_path('/filename?foo=bar') 1214 self.assertEqual(path, self.translated_3) 1215 1216 path = self.handler_1.translate_path('/filename?a=b&spam=eggs#zot') 1217 self.assertEqual(path, self.translated_1) 1218 path = self.handler_2.translate_path('/filename?a=b&spam=eggs#zot') 1219 self.assertEqual(path, self.translated_2) 1220 path = self.handler_3.translate_path('/filename?a=b&spam=eggs#zot') 1221 self.assertEqual(path, self.translated_3) 1222 1223 def test_start_with_double_slash(self): 1224 path = self.handler_1.translate_path('//filename') 1225 self.assertEqual(path, self.translated_1) 1226 path = self.handler_2.translate_path('//filename') 1227 self.assertEqual(path, self.translated_2) 1228 path = self.handler_3.translate_path('//filename') 1229 self.assertEqual(path, self.translated_3) 1230 1231 path = self.handler_1.translate_path('//filename?foo=bar') 1232 self.assertEqual(path, self.translated_1) 1233 path = self.handler_2.translate_path('//filename?foo=bar') 1234 self.assertEqual(path, self.translated_2) 1235 path = self.handler_3.translate_path('//filename?foo=bar') 1236 self.assertEqual(path, self.translated_3) 1237 1238 def test_windows_colon(self): 1239 with support.swap_attr(server.os, 'path', ntpath): 1240 path = self.handler_1.translate_path('c:c:c:foo/filename') 1241 path = path.replace(ntpath.sep, os.sep) 1242 self.assertEqual(path, self.translated_1) 1243 path = self.handler_2.translate_path('c:c:c:foo/filename') 1244 path = path.replace(ntpath.sep, os.sep) 1245 self.assertEqual(path, self.translated_2) 1246 path = self.handler_3.translate_path('c:c:c:foo/filename') 1247 path = path.replace(ntpath.sep, os.sep) 1248 self.assertEqual(path, self.translated_3) 1249 1250 path = self.handler_1.translate_path('\\c:../filename') 1251 path = path.replace(ntpath.sep, os.sep) 1252 self.assertEqual(path, self.translated_1) 1253 path = self.handler_2.translate_path('\\c:../filename') 1254 path = path.replace(ntpath.sep, os.sep) 1255 self.assertEqual(path, self.translated_2) 1256 path = self.handler_3.translate_path('\\c:../filename') 1257 path = path.replace(ntpath.sep, os.sep) 1258 self.assertEqual(path, self.translated_3) 1259 1260 path = self.handler_1.translate_path('c:\\c:..\\foo/filename') 1261 path = path.replace(ntpath.sep, os.sep) 1262 self.assertEqual(path, self.translated_1) 1263 path = self.handler_2.translate_path('c:\\c:..\\foo/filename') 1264 path = path.replace(ntpath.sep, os.sep) 1265 self.assertEqual(path, self.translated_2) 1266 path = self.handler_3.translate_path('c:\\c:..\\foo/filename') 1267 path = path.replace(ntpath.sep, os.sep) 1268 self.assertEqual(path, self.translated_3) 1269 1270 path = self.handler_1.translate_path('c:c:foo\\c:c:bar/filename') 1271 path = path.replace(ntpath.sep, os.sep) 1272 self.assertEqual(path, self.translated_1) 1273 path = self.handler_2.translate_path('c:c:foo\\c:c:bar/filename') 1274 path = path.replace(ntpath.sep, os.sep) 1275 self.assertEqual(path, self.translated_2) 1276 path = self.handler_3.translate_path('c:c:foo\\c:c:bar/filename') 1277 path = path.replace(ntpath.sep, os.sep) 1278 self.assertEqual(path, self.translated_3) 1279 1280 1281class MiscTestCase(unittest.TestCase): 1282 def test_all(self): 1283 expected = [] 1284 denylist = {'executable', 'nobody_uid', 'test'} 1285 for name in dir(server): 1286 if name.startswith('_') or name in denylist: 1287 continue 1288 module_object = getattr(server, name) 1289 if getattr(module_object, '__module__', None) == 'http.server': 1290 expected.append(name) 1291 self.assertCountEqual(server.__all__, expected) 1292 1293 1294class ScriptTestCase(unittest.TestCase): 1295 1296 def mock_server_class(self): 1297 return mock.MagicMock( 1298 return_value=mock.MagicMock( 1299 __enter__=mock.MagicMock( 1300 return_value=mock.MagicMock( 1301 socket=mock.MagicMock( 1302 getsockname=lambda: ('', 0), 1303 ), 1304 ), 1305 ), 1306 ), 1307 ) 1308 1309 @mock.patch('builtins.print') 1310 def test_server_test_unspec(self, _): 1311 mock_server = self.mock_server_class() 1312 server.test(ServerClass=mock_server, bind=None) 1313 self.assertIn( 1314 mock_server.address_family, 1315 (socket.AF_INET6, socket.AF_INET), 1316 ) 1317 1318 @mock.patch('builtins.print') 1319 def test_server_test_localhost(self, _): 1320 mock_server = self.mock_server_class() 1321 server.test(ServerClass=mock_server, bind="localhost") 1322 self.assertIn( 1323 mock_server.address_family, 1324 (socket.AF_INET6, socket.AF_INET), 1325 ) 1326 1327 ipv6_addrs = ( 1328 "::", 1329 "2001:0db8:85a3:0000:0000:8a2e:0370:7334", 1330 "::1", 1331 ) 1332 1333 ipv4_addrs = ( 1334 "0.0.0.0", 1335 "8.8.8.8", 1336 "127.0.0.1", 1337 ) 1338 1339 @mock.patch('builtins.print') 1340 def test_server_test_ipv6(self, _): 1341 for bind in self.ipv6_addrs: 1342 mock_server = self.mock_server_class() 1343 server.test(ServerClass=mock_server, bind=bind) 1344 self.assertEqual(mock_server.address_family, socket.AF_INET6) 1345 1346 @mock.patch('builtins.print') 1347 def test_server_test_ipv4(self, _): 1348 for bind in self.ipv4_addrs: 1349 mock_server = self.mock_server_class() 1350 server.test(ServerClass=mock_server, bind=bind) 1351 self.assertEqual(mock_server.address_family, socket.AF_INET) 1352 1353 1354def setUpModule(): 1355 unittest.addModuleCleanup(os.chdir, os.getcwd()) 1356 1357 1358if __name__ == '__main__': 1359 unittest.main() 1360