1"""Unittests for the various HTTPServer modules. 2 3Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>, 4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest. 5""" 6from collections import OrderedDict 7from http.server import BaseHTTPRequestHandler, HTTPServer, \ 8 SimpleHTTPRequestHandler, CGIHTTPRequestHandler 9from http import server, HTTPStatus 10 11import os 12import socket 13import sys 14import re 15import base64 16import ntpath 17import pathlib 18import shutil 19import email.message 20import email.utils 21import html 22import http, http.client 23import urllib.parse 24import tempfile 25import time 26import datetime 27import threading 28from unittest import mock 29from io import BytesIO 30 31import unittest 32from test import support 33 34 35class NoLogRequestHandler: 36 def log_message(self, *args): 37 # don't write log messages to stderr 38 pass 39 40 def read(self, n=None): 41 return '' 42 43 44class TestServerThread(threading.Thread): 45 def __init__(self, test_object, request_handler): 46 threading.Thread.__init__(self) 47 self.request_handler = request_handler 48 self.test_object = test_object 49 50 def run(self): 51 self.server = HTTPServer(('localhost', 0), self.request_handler) 52 self.test_object.HOST, self.test_object.PORT = self.server.socket.getsockname() 53 self.test_object.server_started.set() 54 self.test_object = None 55 try: 56 self.server.serve_forever(0.05) 57 finally: 58 self.server.server_close() 59 60 def stop(self): 61 self.server.shutdown() 62 self.join() 63 64 65class BaseTestCase(unittest.TestCase): 66 def setUp(self): 67 self._threads = support.threading_setup() 68 os.environ = support.EnvironmentVarGuard() 69 self.server_started = threading.Event() 70 self.thread = TestServerThread(self, self.request_handler) 71 self.thread.start() 72 self.server_started.wait() 73 74 def tearDown(self): 75 self.thread.stop() 76 self.thread = None 77 os.environ.__exit__() 78 support.threading_cleanup(*self._threads) 79 80 def request(self, uri, method='GET', body=None, headers={}): 81 self.connection = http.client.HTTPConnection(self.HOST, self.PORT) 82 self.connection.request(method, uri, body, headers) 83 return self.connection.getresponse() 84 85 86class BaseHTTPServerTestCase(BaseTestCase): 87 class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): 88 protocol_version = 'HTTP/1.1' 89 default_request_version = 'HTTP/1.1' 90 91 def do_TEST(self): 92 self.send_response(HTTPStatus.NO_CONTENT) 93 self.send_header('Content-Type', 'text/html') 94 self.send_header('Connection', 'close') 95 self.end_headers() 96 97 def do_KEEP(self): 98 self.send_response(HTTPStatus.NO_CONTENT) 99 self.send_header('Content-Type', 'text/html') 100 self.send_header('Connection', 'keep-alive') 101 self.end_headers() 102 103 def do_KEYERROR(self): 104 self.send_error(999) 105 106 def do_NOTFOUND(self): 107 self.send_error(HTTPStatus.NOT_FOUND) 108 109 def do_EXPLAINERROR(self): 110 self.send_error(999, "Short Message", 111 "This is a long \n explanation") 112 113 def do_CUSTOM(self): 114 self.send_response(999) 115 self.send_header('Content-Type', 'text/html') 116 self.send_header('Connection', 'close') 117 self.end_headers() 118 119 def do_LATINONEHEADER(self): 120 self.send_response(999) 121 self.send_header('X-Special', 'Dängerous Mind') 122 self.send_header('Connection', 'close') 123 self.end_headers() 124 body = self.headers['x-special-incoming'].encode('utf-8') 125 self.wfile.write(body) 126 127 def do_SEND_ERROR(self): 128 self.send_error(int(self.path[1:])) 129 130 def do_HEAD(self): 131 self.send_error(int(self.path[1:])) 132 133 def setUp(self): 134 BaseTestCase.setUp(self) 135 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 136 self.con.connect() 137 138 def test_command(self): 139 self.con.request('GET', '/') 140 res = self.con.getresponse() 141 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 142 143 def test_request_line_trimming(self): 144 self.con._http_vsn_str = 'HTTP/1.1\n' 145 self.con.putrequest('XYZBOGUS', '/') 146 self.con.endheaders() 147 res = self.con.getresponse() 148 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 149 150 def test_version_bogus(self): 151 self.con._http_vsn_str = 'FUBAR' 152 self.con.putrequest('GET', '/') 153 self.con.endheaders() 154 res = self.con.getresponse() 155 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 156 157 def test_version_digits(self): 158 self.con._http_vsn_str = 'HTTP/9.9.9' 159 self.con.putrequest('GET', '/') 160 self.con.endheaders() 161 res = self.con.getresponse() 162 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 163 164 def test_version_none_get(self): 165 self.con._http_vsn_str = '' 166 self.con.putrequest('GET', '/') 167 self.con.endheaders() 168 res = self.con.getresponse() 169 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 170 171 def test_version_none(self): 172 # Test that a valid method is rejected when not HTTP/1.x 173 self.con._http_vsn_str = '' 174 self.con.putrequest('CUSTOM', '/') 175 self.con.endheaders() 176 res = self.con.getresponse() 177 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 178 179 def test_version_invalid(self): 180 self.con._http_vsn = 99 181 self.con._http_vsn_str = 'HTTP/9.9' 182 self.con.putrequest('GET', '/') 183 self.con.endheaders() 184 res = self.con.getresponse() 185 self.assertEqual(res.status, HTTPStatus.HTTP_VERSION_NOT_SUPPORTED) 186 187 def test_send_blank(self): 188 self.con._http_vsn_str = '' 189 self.con.putrequest('', '') 190 self.con.endheaders() 191 res = self.con.getresponse() 192 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 193 194 def test_header_close(self): 195 self.con.putrequest('GET', '/') 196 self.con.putheader('Connection', 'close') 197 self.con.endheaders() 198 res = self.con.getresponse() 199 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 200 201 def test_header_keep_alive(self): 202 self.con._http_vsn_str = 'HTTP/1.1' 203 self.con.putrequest('GET', '/') 204 self.con.putheader('Connection', 'keep-alive') 205 self.con.endheaders() 206 res = self.con.getresponse() 207 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 208 209 def test_handler(self): 210 self.con.request('TEST', '/') 211 res = self.con.getresponse() 212 self.assertEqual(res.status, HTTPStatus.NO_CONTENT) 213 214 def test_return_header_keep_alive(self): 215 self.con.request('KEEP', '/') 216 res = self.con.getresponse() 217 self.assertEqual(res.getheader('Connection'), 'keep-alive') 218 self.con.request('TEST', '/') 219 self.addCleanup(self.con.close) 220 221 def test_internal_key_error(self): 222 self.con.request('KEYERROR', '/') 223 res = self.con.getresponse() 224 self.assertEqual(res.status, 999) 225 226 def test_return_custom_status(self): 227 self.con.request('CUSTOM', '/') 228 res = self.con.getresponse() 229 self.assertEqual(res.status, 999) 230 231 def test_return_explain_error(self): 232 self.con.request('EXPLAINERROR', '/') 233 res = self.con.getresponse() 234 self.assertEqual(res.status, 999) 235 self.assertTrue(int(res.getheader('Content-Length'))) 236 237 def test_latin1_header(self): 238 self.con.request('LATINONEHEADER', '/', headers={ 239 'X-Special-Incoming': 'Ärger mit Unicode' 240 }) 241 res = self.con.getresponse() 242 self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind') 243 self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8')) 244 245 def test_error_content_length(self): 246 # Issue #16088: standard error responses should have a content-length 247 self.con.request('NOTFOUND', '/') 248 res = self.con.getresponse() 249 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 250 251 data = res.read() 252 self.assertEqual(int(res.getheader('Content-Length')), len(data)) 253 254 def test_send_error(self): 255 allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED, 256 HTTPStatus.RESET_CONTENT) 257 for code in (HTTPStatus.NO_CONTENT, HTTPStatus.NOT_MODIFIED, 258 HTTPStatus.PROCESSING, HTTPStatus.RESET_CONTENT, 259 HTTPStatus.SWITCHING_PROTOCOLS): 260 self.con.request('SEND_ERROR', '/{}'.format(code)) 261 res = self.con.getresponse() 262 self.assertEqual(code, res.status) 263 self.assertEqual(None, res.getheader('Content-Length')) 264 self.assertEqual(None, res.getheader('Content-Type')) 265 if code not in allow_transfer_encoding_codes: 266 self.assertEqual(None, res.getheader('Transfer-Encoding')) 267 268 data = res.read() 269 self.assertEqual(b'', data) 270 271 def test_head_via_send_error(self): 272 allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED, 273 HTTPStatus.RESET_CONTENT) 274 for code in (HTTPStatus.OK, HTTPStatus.NO_CONTENT, 275 HTTPStatus.NOT_MODIFIED, HTTPStatus.RESET_CONTENT, 276 HTTPStatus.SWITCHING_PROTOCOLS): 277 self.con.request('HEAD', '/{}'.format(code)) 278 res = self.con.getresponse() 279 self.assertEqual(code, res.status) 280 if code == HTTPStatus.OK: 281 self.assertTrue(int(res.getheader('Content-Length')) > 0) 282 self.assertIn('text/html', res.getheader('Content-Type')) 283 else: 284 self.assertEqual(None, res.getheader('Content-Length')) 285 self.assertEqual(None, res.getheader('Content-Type')) 286 if code not in allow_transfer_encoding_codes: 287 self.assertEqual(None, res.getheader('Transfer-Encoding')) 288 289 data = res.read() 290 self.assertEqual(b'', data) 291 292 293class RequestHandlerLoggingTestCase(BaseTestCase): 294 class request_handler(BaseHTTPRequestHandler): 295 protocol_version = 'HTTP/1.1' 296 default_request_version = 'HTTP/1.1' 297 298 def do_GET(self): 299 self.send_response(HTTPStatus.OK) 300 self.end_headers() 301 302 def do_ERROR(self): 303 self.send_error(HTTPStatus.NOT_FOUND, 'File not found') 304 305 def test_get(self): 306 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 307 self.con.connect() 308 309 with support.captured_stderr() as err: 310 self.con.request('GET', '/') 311 self.con.getresponse() 312 313 self.assertTrue( 314 err.getvalue().endswith('"GET / HTTP/1.1" 200 -\n')) 315 316 def test_err(self): 317 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 318 self.con.connect() 319 320 with support.captured_stderr() as err: 321 self.con.request('ERROR', '/') 322 self.con.getresponse() 323 324 lines = err.getvalue().split('\n') 325 self.assertTrue(lines[0].endswith('code 404, message File not found')) 326 self.assertTrue(lines[1].endswith('"ERROR / HTTP/1.1" 404 -')) 327 328 329class SimpleHTTPServerTestCase(BaseTestCase): 330 class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler): 331 pass 332 333 def setUp(self): 334 super().setUp() 335 self.cwd = os.getcwd() 336 basetempdir = tempfile.gettempdir() 337 os.chdir(basetempdir) 338 self.data = b'We are the knights who say Ni!' 339 self.tempdir = tempfile.mkdtemp(dir=basetempdir) 340 self.tempdir_name = os.path.basename(self.tempdir) 341 self.base_url = '/' + self.tempdir_name 342 tempname = os.path.join(self.tempdir, 'test') 343 with open(tempname, 'wb') as temp: 344 temp.write(self.data) 345 temp.flush() 346 mtime = os.stat(tempname).st_mtime 347 # compute last modification datetime for browser cache tests 348 last_modif = datetime.datetime.fromtimestamp(mtime, 349 datetime.timezone.utc) 350 self.last_modif_datetime = last_modif.replace(microsecond=0) 351 self.last_modif_header = email.utils.formatdate( 352 last_modif.timestamp(), usegmt=True) 353 354 def tearDown(self): 355 try: 356 os.chdir(self.cwd) 357 try: 358 shutil.rmtree(self.tempdir) 359 except: 360 pass 361 finally: 362 super().tearDown() 363 364 def check_status_and_reason(self, response, status, data=None): 365 def close_conn(): 366 """Don't close reader yet so we can check if there was leftover 367 buffered input""" 368 nonlocal reader 369 reader = response.fp 370 response.fp = None 371 reader = None 372 response._close_conn = close_conn 373 374 body = response.read() 375 self.assertTrue(response) 376 self.assertEqual(response.status, status) 377 self.assertIsNotNone(response.reason) 378 if data: 379 self.assertEqual(data, body) 380 # Ensure the server has not set up a persistent connection, and has 381 # not sent any extra data 382 self.assertEqual(response.version, 10) 383 self.assertEqual(response.msg.get("Connection", "close"), "close") 384 self.assertEqual(reader.read(30), b'', 'Connection should be closed') 385 386 reader.close() 387 return body 388 389 @unittest.skipIf(sys.platform == 'darwin', 390 'undecodable name cannot always be decoded on macOS') 391 @unittest.skipIf(sys.platform == 'win32', 392 'undecodable name cannot be decoded on win32') 393 @unittest.skipUnless(support.TESTFN_UNDECODABLE, 394 'need support.TESTFN_UNDECODABLE') 395 def test_undecodable_filename(self): 396 enc = sys.getfilesystemencoding() 397 filename = os.fsdecode(support.TESTFN_UNDECODABLE) + '.txt' 398 with open(os.path.join(self.tempdir, filename), 'wb') as f: 399 f.write(support.TESTFN_UNDECODABLE) 400 response = self.request(self.base_url + '/') 401 if sys.platform == 'darwin': 402 # On Mac OS the HFS+ filesystem replaces bytes that aren't valid 403 # UTF-8 into a percent-encoded value. 404 for name in os.listdir(self.tempdir): 405 if name != 'test': # Ignore a filename created in setUp(). 406 filename = name 407 break 408 body = self.check_status_and_reason(response, HTTPStatus.OK) 409 quotedname = urllib.parse.quote(filename, errors='surrogatepass') 410 self.assertIn(('href="%s"' % quotedname) 411 .encode(enc, 'surrogateescape'), body) 412 self.assertIn(('>%s<' % html.escape(filename, quote=False)) 413 .encode(enc, 'surrogateescape'), body) 414 response = self.request(self.base_url + '/' + quotedname) 415 self.check_status_and_reason(response, HTTPStatus.OK, 416 data=support.TESTFN_UNDECODABLE) 417 418 def test_get_dir_redirect_location_domain_injection_bug(self): 419 """Ensure //evil.co/..%2f../../X does not put //evil.co/ in Location. 420 421 //netloc/ in a Location header is a redirect to a new host. 422 https://github.com/python/cpython/issues/87389 423 424 This checks that a path resolving to a directory on our server cannot 425 resolve into a redirect to another server. 426 """ 427 os.mkdir(os.path.join(self.tempdir, 'existing_directory')) 428 url = f'/python.org/..%2f..%2f..%2f..%2f..%2f../%0a%0d/../{self.tempdir_name}/existing_directory' 429 expected_location = f'{url}/' # /python.org.../ single slash single prefix, trailing slash 430 # Canonicalizes to /tmp/tempdir_name/existing_directory which does 431 # exist and is a dir, triggering the 301 redirect logic. 432 response = self.request(url) 433 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 434 location = response.getheader('Location') 435 self.assertEqual(location, expected_location, msg='non-attack failed!') 436 437 # //python.org... multi-slash prefix, no trailing slash 438 attack_url = f'/{url}' 439 response = self.request(attack_url) 440 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 441 location = response.getheader('Location') 442 self.assertFalse(location.startswith('//'), msg=location) 443 self.assertEqual(location, expected_location, 444 msg='Expected Location header to start with a single / and ' 445 'end with a / as this is a directory redirect.') 446 447 # ///python.org... triple-slash prefix, no trailing slash 448 attack3_url = f'//{url}' 449 response = self.request(attack3_url) 450 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 451 self.assertEqual(response.getheader('Location'), expected_location) 452 453 # If the second word in the http request (Request-URI for the http 454 # method) is a full URI, we don't worry about it, as that'll be parsed 455 # and reassembled as a full URI within BaseHTTPRequestHandler.send_head 456 # so no errant scheme-less //netloc//evil.co/ domain mixup can happen. 457 attack_scheme_netloc_2slash_url = f'https://pypi.org/{url}' 458 expected_scheme_netloc_location = f'{attack_scheme_netloc_2slash_url}/' 459 response = self.request(attack_scheme_netloc_2slash_url) 460 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 461 location = response.getheader('Location') 462 # We're just ensuring that the scheme and domain make it through, if 463 # there are or aren't multiple slashes at the start of the path that 464 # follows that isn't important in this Location: header. 465 self.assertTrue(location.startswith('https://pypi.org/'), msg=location) 466 467 def test_get(self): 468 #constructs the path relative to the root directory of the HTTPServer 469 response = self.request(self.base_url + '/test') 470 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) 471 # check for trailing "/" which should return 404. See Issue17324 472 response = self.request(self.base_url + '/test/') 473 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 474 response = self.request(self.base_url + '/') 475 self.check_status_and_reason(response, HTTPStatus.OK) 476 response = self.request(self.base_url) 477 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 478 response = self.request(self.base_url + '/?hi=2') 479 self.check_status_and_reason(response, HTTPStatus.OK) 480 response = self.request(self.base_url + '?hi=1') 481 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 482 self.assertEqual(response.getheader("Location"), 483 self.base_url + "/?hi=1") 484 response = self.request('/ThisDoesNotExist') 485 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 486 response = self.request('/' + 'ThisDoesNotExist' + '/') 487 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 488 489 data = b"Dummy index file\r\n" 490 with open(os.path.join(self.tempdir_name, 'index.html'), 'wb') as f: 491 f.write(data) 492 response = self.request(self.base_url + '/') 493 self.check_status_and_reason(response, HTTPStatus.OK, data) 494 495 # chmod() doesn't work as expected on Windows, and filesystem 496 # permissions are ignored by root on Unix. 497 if os.name == 'posix' and os.geteuid() != 0: 498 os.chmod(self.tempdir, 0) 499 try: 500 response = self.request(self.base_url + '/') 501 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 502 finally: 503 os.chmod(self.tempdir, 0o755) 504 505 def test_head(self): 506 response = self.request( 507 self.base_url + '/test', method='HEAD') 508 self.check_status_and_reason(response, HTTPStatus.OK) 509 self.assertEqual(response.getheader('content-length'), 510 str(len(self.data))) 511 self.assertEqual(response.getheader('content-type'), 512 'application/octet-stream') 513 514 def test_browser_cache(self): 515 """Check that when a request to /test is sent with the request header 516 If-Modified-Since set to date of last modification, the server returns 517 status code 304, not 200 518 """ 519 headers = email.message.Message() 520 headers['If-Modified-Since'] = self.last_modif_header 521 response = self.request(self.base_url + '/test', headers=headers) 522 self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED) 523 524 # one hour after last modification : must return 304 525 new_dt = self.last_modif_datetime + datetime.timedelta(hours=1) 526 headers = email.message.Message() 527 headers['If-Modified-Since'] = email.utils.format_datetime(new_dt, 528 usegmt=True) 529 response = self.request(self.base_url + '/test', headers=headers) 530 self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED) 531 532 def test_browser_cache_file_changed(self): 533 # with If-Modified-Since earlier than Last-Modified, must return 200 534 dt = self.last_modif_datetime 535 # build datetime object : 365 days before last modification 536 old_dt = dt - datetime.timedelta(days=365) 537 headers = email.message.Message() 538 headers['If-Modified-Since'] = email.utils.format_datetime(old_dt, 539 usegmt=True) 540 response = self.request(self.base_url + '/test', headers=headers) 541 self.check_status_and_reason(response, HTTPStatus.OK) 542 543 def test_browser_cache_with_If_None_Match_header(self): 544 # if If-None-Match header is present, ignore If-Modified-Since 545 546 headers = email.message.Message() 547 headers['If-Modified-Since'] = self.last_modif_header 548 headers['If-None-Match'] = "*" 549 response = self.request(self.base_url + '/test', headers=headers) 550 self.check_status_and_reason(response, HTTPStatus.OK) 551 552 def test_invalid_requests(self): 553 response = self.request('/', method='FOO') 554 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 555 # requests must be case sensitive,so this should fail too 556 response = self.request('/', method='custom') 557 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 558 response = self.request('/', method='GETs') 559 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 560 561 def test_last_modified(self): 562 """Checks that the datetime returned in Last-Modified response header 563 is the actual datetime of last modification, rounded to the second 564 """ 565 response = self.request(self.base_url + '/test') 566 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) 567 last_modif_header = response.headers['Last-modified'] 568 self.assertEqual(last_modif_header, self.last_modif_header) 569 570 def test_path_without_leading_slash(self): 571 response = self.request(self.tempdir_name + '/test') 572 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) 573 response = self.request(self.tempdir_name + '/test/') 574 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 575 response = self.request(self.tempdir_name + '/') 576 self.check_status_and_reason(response, HTTPStatus.OK) 577 response = self.request(self.tempdir_name) 578 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 579 response = self.request(self.tempdir_name + '/?hi=2') 580 self.check_status_and_reason(response, HTTPStatus.OK) 581 response = self.request(self.tempdir_name + '?hi=1') 582 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 583 self.assertEqual(response.getheader("Location"), 584 self.tempdir_name + "/?hi=1") 585 586 def test_html_escape_filename(self): 587 filename = '<test&>.txt' 588 fullpath = os.path.join(self.tempdir, filename) 589 590 try: 591 open(fullpath, 'w').close() 592 except OSError: 593 raise unittest.SkipTest('Can not create file %s on current file ' 594 'system' % filename) 595 596 try: 597 response = self.request(self.base_url + '/') 598 body = self.check_status_and_reason(response, HTTPStatus.OK) 599 enc = response.headers.get_content_charset() 600 finally: 601 os.unlink(fullpath) # avoid affecting test_undecodable_filename 602 603 self.assertIsNotNone(enc) 604 html_text = '>%s<' % html.escape(filename, quote=False) 605 self.assertIn(html_text.encode(enc), body) 606 607 608cgi_file1 = """\ 609#!%s 610 611print("Content-type: text/html") 612print() 613print("Hello World") 614""" 615 616cgi_file2 = """\ 617#!%s 618import cgi 619 620print("Content-type: text/html") 621print() 622 623form = cgi.FieldStorage() 624print("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"), 625 form.getfirst("bacon"))) 626""" 627 628cgi_file4 = """\ 629#!%s 630import os 631 632print("Content-type: text/html") 633print() 634 635print(os.environ["%s"]) 636""" 637 638cgi_file6 = """\ 639#!%s 640import os 641 642print("Content-type: text/plain") 643print() 644print(repr(os.environ)) 645""" 646 647 648@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, 649 "This test can't be run reliably as root (issue #13308).") 650class CGIHTTPServerTestCase(BaseTestCase): 651 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler): 652 pass 653 654 linesep = os.linesep.encode('ascii') 655 656 def setUp(self): 657 BaseTestCase.setUp(self) 658 self.cwd = os.getcwd() 659 self.parent_dir = tempfile.mkdtemp() 660 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin') 661 self.cgi_child_dir = os.path.join(self.cgi_dir, 'child-dir') 662 self.sub_dir_1 = os.path.join(self.parent_dir, 'sub') 663 self.sub_dir_2 = os.path.join(self.sub_dir_1, 'dir') 664 self.cgi_dir_in_sub_dir = os.path.join(self.sub_dir_2, 'cgi-bin') 665 os.mkdir(self.cgi_dir) 666 os.mkdir(self.cgi_child_dir) 667 os.mkdir(self.sub_dir_1) 668 os.mkdir(self.sub_dir_2) 669 os.mkdir(self.cgi_dir_in_sub_dir) 670 self.nocgi_path = None 671 self.file1_path = None 672 self.file2_path = None 673 self.file3_path = None 674 self.file4_path = None 675 self.file5_path = None 676 677 # The shebang line should be pure ASCII: use symlink if possible. 678 # See issue #7668. 679 self._pythonexe_symlink = None 680 if support.can_symlink(): 681 self.pythonexe = os.path.join(self.parent_dir, 'python') 682 self._pythonexe_symlink = support.PythonSymlink(self.pythonexe).__enter__() 683 else: 684 self.pythonexe = sys.executable 685 686 try: 687 # The python executable path is written as the first line of the 688 # CGI Python script. The encoding cookie cannot be used, and so the 689 # path should be encodable to the default script encoding (utf-8) 690 self.pythonexe.encode('utf-8') 691 except UnicodeEncodeError: 692 self.tearDown() 693 self.skipTest("Python executable path is not encodable to utf-8") 694 695 self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py') 696 with open(self.nocgi_path, 'w') as fp: 697 fp.write(cgi_file1 % self.pythonexe) 698 os.chmod(self.nocgi_path, 0o777) 699 700 self.file1_path = os.path.join(self.cgi_dir, 'file1.py') 701 with open(self.file1_path, 'w', encoding='utf-8') as file1: 702 file1.write(cgi_file1 % self.pythonexe) 703 os.chmod(self.file1_path, 0o777) 704 705 self.file2_path = os.path.join(self.cgi_dir, 'file2.py') 706 with open(self.file2_path, 'w', encoding='utf-8') as file2: 707 file2.write(cgi_file2 % self.pythonexe) 708 os.chmod(self.file2_path, 0o777) 709 710 self.file3_path = os.path.join(self.cgi_child_dir, 'file3.py') 711 with open(self.file3_path, 'w', encoding='utf-8') as file3: 712 file3.write(cgi_file1 % self.pythonexe) 713 os.chmod(self.file3_path, 0o777) 714 715 self.file4_path = os.path.join(self.cgi_dir, 'file4.py') 716 with open(self.file4_path, 'w', encoding='utf-8') as file4: 717 file4.write(cgi_file4 % (self.pythonexe, 'QUERY_STRING')) 718 os.chmod(self.file4_path, 0o777) 719 720 self.file5_path = os.path.join(self.cgi_dir_in_sub_dir, 'file5.py') 721 with open(self.file5_path, 'w', encoding='utf-8') as file5: 722 file5.write(cgi_file1 % self.pythonexe) 723 os.chmod(self.file5_path, 0o777) 724 725 self.file6_path = os.path.join(self.cgi_dir, 'file6.py') 726 with open(self.file6_path, 'w', encoding='utf-8') as file6: 727 file6.write(cgi_file6 % self.pythonexe) 728 os.chmod(self.file6_path, 0o777) 729 730 os.chdir(self.parent_dir) 731 732 def tearDown(self): 733 try: 734 os.chdir(self.cwd) 735 if self._pythonexe_symlink: 736 self._pythonexe_symlink.__exit__(None, None, None) 737 if self.nocgi_path: 738 os.remove(self.nocgi_path) 739 if self.file1_path: 740 os.remove(self.file1_path) 741 if self.file2_path: 742 os.remove(self.file2_path) 743 if self.file3_path: 744 os.remove(self.file3_path) 745 if self.file4_path: 746 os.remove(self.file4_path) 747 if self.file5_path: 748 os.remove(self.file5_path) 749 if self.file6_path: 750 os.remove(self.file6_path) 751 os.rmdir(self.cgi_child_dir) 752 os.rmdir(self.cgi_dir) 753 os.rmdir(self.cgi_dir_in_sub_dir) 754 os.rmdir(self.sub_dir_2) 755 os.rmdir(self.sub_dir_1) 756 os.rmdir(self.parent_dir) 757 finally: 758 BaseTestCase.tearDown(self) 759 760 def test_url_collapse_path(self): 761 # verify tail is the last portion and head is the rest on proper urls 762 test_vectors = { 763 '': '//', 764 '..': IndexError, 765 '/.//..': IndexError, 766 '/': '//', 767 '//': '//', 768 '/\\': '//\\', 769 '/.//': '//', 770 'cgi-bin/file1.py': '/cgi-bin/file1.py', 771 '/cgi-bin/file1.py': '/cgi-bin/file1.py', 772 'a': '//a', 773 '/a': '//a', 774 '//a': '//a', 775 './a': '//a', 776 './C:/': '/C:/', 777 '/a/b': '/a/b', 778 '/a/b/': '/a/b/', 779 '/a/b/.': '/a/b/', 780 '/a/b/c/..': '/a/b/', 781 '/a/b/c/../d': '/a/b/d', 782 '/a/b/c/../d/e/../f': '/a/b/d/f', 783 '/a/b/c/../d/e/../../f': '/a/b/f', 784 '/a/b/c/../d/e/.././././..//f': '/a/b/f', 785 '../a/b/c/../d/e/.././././..//f': IndexError, 786 '/a/b/c/../d/e/../../../f': '/a/f', 787 '/a/b/c/../d/e/../../../../f': '//f', 788 '/a/b/c/../d/e/../../../../../f': IndexError, 789 '/a/b/c/../d/e/../../../../f/..': '//', 790 '/a/b/c/../d/e/../../../../f/../.': '//', 791 } 792 for path, expected in test_vectors.items(): 793 if isinstance(expected, type) and issubclass(expected, Exception): 794 self.assertRaises(expected, 795 server._url_collapse_path, path) 796 else: 797 actual = server._url_collapse_path(path) 798 self.assertEqual(expected, actual, 799 msg='path = %r\nGot: %r\nWanted: %r' % 800 (path, actual, expected)) 801 802 def test_headers_and_content(self): 803 res = self.request('/cgi-bin/file1.py') 804 self.assertEqual( 805 (res.read(), res.getheader('Content-type'), res.status), 806 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK)) 807 808 def test_issue19435(self): 809 res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh') 810 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 811 812 def test_post(self): 813 params = urllib.parse.urlencode( 814 {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456}) 815 headers = {'Content-type' : 'application/x-www-form-urlencoded'} 816 res = self.request('/cgi-bin/file2.py', 'POST', params, headers) 817 818 self.assertEqual(res.read(), b'1, python, 123456' + self.linesep) 819 820 def test_invaliduri(self): 821 res = self.request('/cgi-bin/invalid') 822 res.read() 823 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 824 825 def test_authorization(self): 826 headers = {b'Authorization' : b'Basic ' + 827 base64.b64encode(b'username:pass')} 828 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers) 829 self.assertEqual( 830 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 831 (res.read(), res.getheader('Content-type'), res.status)) 832 833 def test_no_leading_slash(self): 834 # http://bugs.python.org/issue2254 835 res = self.request('cgi-bin/file1.py') 836 self.assertEqual( 837 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 838 (res.read(), res.getheader('Content-type'), res.status)) 839 840 def test_os_environ_is_not_altered(self): 841 signature = "Test CGI Server" 842 os.environ['SERVER_SOFTWARE'] = signature 843 res = self.request('/cgi-bin/file1.py') 844 self.assertEqual( 845 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 846 (res.read(), res.getheader('Content-type'), res.status)) 847 self.assertEqual(os.environ['SERVER_SOFTWARE'], signature) 848 849 def test_urlquote_decoding_in_cgi_check(self): 850 res = self.request('/cgi-bin%2ffile1.py') 851 self.assertEqual( 852 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 853 (res.read(), res.getheader('Content-type'), res.status)) 854 855 def test_nested_cgi_path_issue21323(self): 856 res = self.request('/cgi-bin/child-dir/file3.py') 857 self.assertEqual( 858 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 859 (res.read(), res.getheader('Content-type'), res.status)) 860 861 def test_query_with_multiple_question_mark(self): 862 res = self.request('/cgi-bin/file4.py?a=b?c=d') 863 self.assertEqual( 864 (b'a=b?c=d' + self.linesep, 'text/html', HTTPStatus.OK), 865 (res.read(), res.getheader('Content-type'), res.status)) 866 867 def test_query_with_continuous_slashes(self): 868 res = self.request('/cgi-bin/file4.py?k=aa%2F%2Fbb&//q//p//=//a//b//') 869 self.assertEqual( 870 (b'k=aa%2F%2Fbb&//q//p//=//a//b//' + self.linesep, 871 'text/html', HTTPStatus.OK), 872 (res.read(), res.getheader('Content-type'), res.status)) 873 874 def test_cgi_path_in_sub_directories(self): 875 try: 876 CGIHTTPRequestHandler.cgi_directories.append('/sub/dir/cgi-bin') 877 res = self.request('/sub/dir/cgi-bin/file5.py') 878 self.assertEqual( 879 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 880 (res.read(), res.getheader('Content-type'), res.status)) 881 finally: 882 CGIHTTPRequestHandler.cgi_directories.remove('/sub/dir/cgi-bin') 883 884 def test_accept(self): 885 browser_accept = \ 886 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' 887 tests = ( 888 ((('Accept', browser_accept),), browser_accept), 889 ((), ''), 890 # Hack case to get two values for the one header 891 ((('Accept', 'text/html'), ('ACCEPT', 'text/plain')), 892 'text/html,text/plain'), 893 ) 894 for headers, expected in tests: 895 headers = OrderedDict(headers) 896 with self.subTest(headers): 897 res = self.request('/cgi-bin/file6.py', 'GET', headers=headers) 898 self.assertEqual(http.HTTPStatus.OK, res.status) 899 expected = f"'HTTP_ACCEPT': {expected!r}" 900 self.assertIn(expected.encode('ascii'), res.read()) 901 902 903class SocketlessRequestHandler(SimpleHTTPRequestHandler): 904 def __init__(self, directory=None): 905 request = mock.Mock() 906 request.makefile.return_value = BytesIO() 907 super().__init__(request, None, None, directory=directory) 908 909 self.get_called = False 910 self.protocol_version = "HTTP/1.1" 911 912 def do_GET(self): 913 self.get_called = True 914 self.send_response(HTTPStatus.OK) 915 self.send_header('Content-Type', 'text/html') 916 self.end_headers() 917 self.wfile.write(b'<html><body>Data</body></html>\r\n') 918 919 def log_message(self, format, *args): 920 pass 921 922class RejectingSocketlessRequestHandler(SocketlessRequestHandler): 923 def handle_expect_100(self): 924 self.send_error(HTTPStatus.EXPECTATION_FAILED) 925 return False 926 927 928class AuditableBytesIO: 929 930 def __init__(self): 931 self.datas = [] 932 933 def write(self, data): 934 self.datas.append(data) 935 936 def getData(self): 937 return b''.join(self.datas) 938 939 @property 940 def numWrites(self): 941 return len(self.datas) 942 943 944class BaseHTTPRequestHandlerTestCase(unittest.TestCase): 945 """Test the functionality of the BaseHTTPServer. 946 947 Test the support for the Expect 100-continue header. 948 """ 949 950 HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK') 951 952 def setUp (self): 953 self.handler = SocketlessRequestHandler() 954 955 def send_typical_request(self, message): 956 input = BytesIO(message) 957 output = BytesIO() 958 self.handler.rfile = input 959 self.handler.wfile = output 960 self.handler.handle_one_request() 961 output.seek(0) 962 return output.readlines() 963 964 def verify_get_called(self): 965 self.assertTrue(self.handler.get_called) 966 967 def verify_expected_headers(self, headers): 968 for fieldName in b'Server: ', b'Date: ', b'Content-Type: ': 969 self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1) 970 971 def verify_http_server_response(self, response): 972 match = self.HTTPResponseMatch.search(response) 973 self.assertIsNotNone(match) 974 975 def test_http_1_1(self): 976 result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n') 977 self.verify_http_server_response(result[0]) 978 self.verify_expected_headers(result[1:-1]) 979 self.verify_get_called() 980 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 981 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 982 self.assertEqual(self.handler.command, 'GET') 983 self.assertEqual(self.handler.path, '/') 984 self.assertEqual(self.handler.request_version, 'HTTP/1.1') 985 self.assertSequenceEqual(self.handler.headers.items(), ()) 986 987 def test_http_1_0(self): 988 result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n') 989 self.verify_http_server_response(result[0]) 990 self.verify_expected_headers(result[1:-1]) 991 self.verify_get_called() 992 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 993 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0') 994 self.assertEqual(self.handler.command, 'GET') 995 self.assertEqual(self.handler.path, '/') 996 self.assertEqual(self.handler.request_version, 'HTTP/1.0') 997 self.assertSequenceEqual(self.handler.headers.items(), ()) 998 999 def test_http_0_9(self): 1000 result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n') 1001 self.assertEqual(len(result), 1) 1002 self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n') 1003 self.verify_get_called() 1004 1005 def test_extra_space(self): 1006 result = self.send_typical_request( 1007 b'GET /spaced out HTTP/1.1\r\n' 1008 b'Host: dummy\r\n' 1009 b'\r\n' 1010 ) 1011 self.assertTrue(result[0].startswith(b'HTTP/1.1 400 ')) 1012 self.verify_expected_headers(result[1:result.index(b'\r\n')]) 1013 self.assertFalse(self.handler.get_called) 1014 1015 def test_with_continue_1_0(self): 1016 result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n') 1017 self.verify_http_server_response(result[0]) 1018 self.verify_expected_headers(result[1:-1]) 1019 self.verify_get_called() 1020 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 1021 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0') 1022 self.assertEqual(self.handler.command, 'GET') 1023 self.assertEqual(self.handler.path, '/') 1024 self.assertEqual(self.handler.request_version, 'HTTP/1.0') 1025 headers = (("Expect", "100-continue"),) 1026 self.assertSequenceEqual(self.handler.headers.items(), headers) 1027 1028 def test_with_continue_1_1(self): 1029 result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 1030 self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n') 1031 self.assertEqual(result[1], b'\r\n') 1032 self.assertEqual(result[2], b'HTTP/1.1 200 OK\r\n') 1033 self.verify_expected_headers(result[2:-1]) 1034 self.verify_get_called() 1035 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 1036 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 1037 self.assertEqual(self.handler.command, 'GET') 1038 self.assertEqual(self.handler.path, '/') 1039 self.assertEqual(self.handler.request_version, 'HTTP/1.1') 1040 headers = (("Expect", "100-continue"),) 1041 self.assertSequenceEqual(self.handler.headers.items(), headers) 1042 1043 def test_header_buffering_of_send_error(self): 1044 1045 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 1046 output = AuditableBytesIO() 1047 handler = SocketlessRequestHandler() 1048 handler.rfile = input 1049 handler.wfile = output 1050 handler.request_version = 'HTTP/1.1' 1051 handler.requestline = '' 1052 handler.command = None 1053 1054 handler.send_error(418) 1055 self.assertEqual(output.numWrites, 2) 1056 1057 def test_header_buffering_of_send_response_only(self): 1058 1059 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 1060 output = AuditableBytesIO() 1061 handler = SocketlessRequestHandler() 1062 handler.rfile = input 1063 handler.wfile = output 1064 handler.request_version = 'HTTP/1.1' 1065 1066 handler.send_response_only(418) 1067 self.assertEqual(output.numWrites, 0) 1068 handler.end_headers() 1069 self.assertEqual(output.numWrites, 1) 1070 1071 def test_header_buffering_of_send_header(self): 1072 1073 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 1074 output = AuditableBytesIO() 1075 handler = SocketlessRequestHandler() 1076 handler.rfile = input 1077 handler.wfile = output 1078 handler.request_version = 'HTTP/1.1' 1079 1080 handler.send_header('Foo', 'foo') 1081 handler.send_header('bar', 'bar') 1082 self.assertEqual(output.numWrites, 0) 1083 handler.end_headers() 1084 self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n') 1085 self.assertEqual(output.numWrites, 1) 1086 1087 def test_header_unbuffered_when_continue(self): 1088 1089 def _readAndReseek(f): 1090 pos = f.tell() 1091 f.seek(0) 1092 data = f.read() 1093 f.seek(pos) 1094 return data 1095 1096 input = BytesIO(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 1097 output = BytesIO() 1098 self.handler.rfile = input 1099 self.handler.wfile = output 1100 self.handler.request_version = 'HTTP/1.1' 1101 1102 self.handler.handle_one_request() 1103 self.assertNotEqual(_readAndReseek(output), b'') 1104 result = _readAndReseek(output).split(b'\r\n') 1105 self.assertEqual(result[0], b'HTTP/1.1 100 Continue') 1106 self.assertEqual(result[1], b'') 1107 self.assertEqual(result[2], b'HTTP/1.1 200 OK') 1108 1109 def test_with_continue_rejected(self): 1110 usual_handler = self.handler # Save to avoid breaking any subsequent tests. 1111 self.handler = RejectingSocketlessRequestHandler() 1112 result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 1113 self.assertEqual(result[0], b'HTTP/1.1 417 Expectation Failed\r\n') 1114 self.verify_expected_headers(result[1:-1]) 1115 # The expect handler should short circuit the usual get method by 1116 # returning false here, so get_called should be false 1117 self.assertFalse(self.handler.get_called) 1118 self.assertEqual(sum(r == b'Connection: close\r\n' for r in result[1:-1]), 1) 1119 self.handler = usual_handler # Restore to avoid breaking any subsequent tests. 1120 1121 def test_request_length(self): 1122 # Issue #10714: huge request lines are discarded, to avoid Denial 1123 # of Service attacks. 1124 result = self.send_typical_request(b'GET ' + b'x' * 65537) 1125 self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n') 1126 self.assertFalse(self.handler.get_called) 1127 self.assertIsInstance(self.handler.requestline, str) 1128 1129 def test_header_length(self): 1130 # Issue #6791: same for headers 1131 result = self.send_typical_request( 1132 b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n') 1133 self.assertEqual(result[0], b'HTTP/1.1 431 Line too long\r\n') 1134 self.assertFalse(self.handler.get_called) 1135 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 1136 1137 def test_too_many_headers(self): 1138 result = self.send_typical_request( 1139 b'GET / HTTP/1.1\r\n' + b'X-Foo: bar\r\n' * 101 + b'\r\n') 1140 self.assertEqual(result[0], b'HTTP/1.1 431 Too many headers\r\n') 1141 self.assertFalse(self.handler.get_called) 1142 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 1143 1144 def test_html_escape_on_error(self): 1145 result = self.send_typical_request( 1146 b'<script>alert("hello")</script> / HTTP/1.1') 1147 result = b''.join(result) 1148 text = '<script>alert("hello")</script>' 1149 self.assertIn(html.escape(text, quote=False).encode('ascii'), result) 1150 1151 def test_close_connection(self): 1152 # handle_one_request() should be repeatedly called until 1153 # it sets close_connection 1154 def handle_one_request(): 1155 self.handler.close_connection = next(close_values) 1156 self.handler.handle_one_request = handle_one_request 1157 1158 close_values = iter((True,)) 1159 self.handler.handle() 1160 self.assertRaises(StopIteration, next, close_values) 1161 1162 close_values = iter((False, False, True)) 1163 self.handler.handle() 1164 self.assertRaises(StopIteration, next, close_values) 1165 1166 def test_date_time_string(self): 1167 now = time.time() 1168 # this is the old code that formats the timestamp 1169 year, month, day, hh, mm, ss, wd, y, z = time.gmtime(now) 1170 expected = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % ( 1171 self.handler.weekdayname[wd], 1172 day, 1173 self.handler.monthname[month], 1174 year, hh, mm, ss 1175 ) 1176 self.assertEqual(self.handler.date_time_string(timestamp=now), expected) 1177 1178 1179class SimpleHTTPRequestHandlerTestCase(unittest.TestCase): 1180 """ Test url parsing """ 1181 def setUp(self): 1182 self.translated_1 = os.path.join(os.getcwd(), 'filename') 1183 self.translated_2 = os.path.join('foo', 'filename') 1184 self.translated_3 = os.path.join('bar', 'filename') 1185 self.handler_1 = SocketlessRequestHandler() 1186 self.handler_2 = SocketlessRequestHandler(directory='foo') 1187 self.handler_3 = SocketlessRequestHandler(directory=pathlib.PurePath('bar')) 1188 1189 def test_query_arguments(self): 1190 path = self.handler_1.translate_path('/filename') 1191 self.assertEqual(path, self.translated_1) 1192 path = self.handler_2.translate_path('/filename') 1193 self.assertEqual(path, self.translated_2) 1194 path = self.handler_3.translate_path('/filename') 1195 self.assertEqual(path, self.translated_3) 1196 1197 path = self.handler_1.translate_path('/filename?foo=bar') 1198 self.assertEqual(path, self.translated_1) 1199 path = self.handler_2.translate_path('/filename?foo=bar') 1200 self.assertEqual(path, self.translated_2) 1201 path = self.handler_3.translate_path('/filename?foo=bar') 1202 self.assertEqual(path, self.translated_3) 1203 1204 path = self.handler_1.translate_path('/filename?a=b&spam=eggs#zot') 1205 self.assertEqual(path, self.translated_1) 1206 path = self.handler_2.translate_path('/filename?a=b&spam=eggs#zot') 1207 self.assertEqual(path, self.translated_2) 1208 path = self.handler_3.translate_path('/filename?a=b&spam=eggs#zot') 1209 self.assertEqual(path, self.translated_3) 1210 1211 def test_start_with_double_slash(self): 1212 path = self.handler_1.translate_path('//filename') 1213 self.assertEqual(path, self.translated_1) 1214 path = self.handler_2.translate_path('//filename') 1215 self.assertEqual(path, self.translated_2) 1216 path = self.handler_3.translate_path('//filename') 1217 self.assertEqual(path, self.translated_3) 1218 1219 path = self.handler_1.translate_path('//filename?foo=bar') 1220 self.assertEqual(path, self.translated_1) 1221 path = self.handler_2.translate_path('//filename?foo=bar') 1222 self.assertEqual(path, self.translated_2) 1223 path = self.handler_3.translate_path('//filename?foo=bar') 1224 self.assertEqual(path, self.translated_3) 1225 1226 def test_windows_colon(self): 1227 with support.swap_attr(server.os, 'path', ntpath): 1228 path = self.handler_1.translate_path('c:c:c:foo/filename') 1229 path = path.replace(ntpath.sep, os.sep) 1230 self.assertEqual(path, self.translated_1) 1231 path = self.handler_2.translate_path('c:c:c:foo/filename') 1232 path = path.replace(ntpath.sep, os.sep) 1233 self.assertEqual(path, self.translated_2) 1234 path = self.handler_3.translate_path('c:c:c:foo/filename') 1235 path = path.replace(ntpath.sep, os.sep) 1236 self.assertEqual(path, self.translated_3) 1237 1238 path = self.handler_1.translate_path('\\c:../filename') 1239 path = path.replace(ntpath.sep, os.sep) 1240 self.assertEqual(path, self.translated_1) 1241 path = self.handler_2.translate_path('\\c:../filename') 1242 path = path.replace(ntpath.sep, os.sep) 1243 self.assertEqual(path, self.translated_2) 1244 path = self.handler_3.translate_path('\\c:../filename') 1245 path = path.replace(ntpath.sep, os.sep) 1246 self.assertEqual(path, self.translated_3) 1247 1248 path = self.handler_1.translate_path('c:\\c:..\\foo/filename') 1249 path = path.replace(ntpath.sep, os.sep) 1250 self.assertEqual(path, self.translated_1) 1251 path = self.handler_2.translate_path('c:\\c:..\\foo/filename') 1252 path = path.replace(ntpath.sep, os.sep) 1253 self.assertEqual(path, self.translated_2) 1254 path = self.handler_3.translate_path('c:\\c:..\\foo/filename') 1255 path = path.replace(ntpath.sep, os.sep) 1256 self.assertEqual(path, self.translated_3) 1257 1258 path = self.handler_1.translate_path('c:c:foo\\c:c:bar/filename') 1259 path = path.replace(ntpath.sep, os.sep) 1260 self.assertEqual(path, self.translated_1) 1261 path = self.handler_2.translate_path('c:c:foo\\c:c:bar/filename') 1262 path = path.replace(ntpath.sep, os.sep) 1263 self.assertEqual(path, self.translated_2) 1264 path = self.handler_3.translate_path('c:c:foo\\c:c:bar/filename') 1265 path = path.replace(ntpath.sep, os.sep) 1266 self.assertEqual(path, self.translated_3) 1267 1268 1269class MiscTestCase(unittest.TestCase): 1270 def test_all(self): 1271 expected = [] 1272 blacklist = {'executable', 'nobody_uid', 'test'} 1273 for name in dir(server): 1274 if name.startswith('_') or name in blacklist: 1275 continue 1276 module_object = getattr(server, name) 1277 if getattr(module_object, '__module__', None) == 'http.server': 1278 expected.append(name) 1279 self.assertCountEqual(server.__all__, expected) 1280 1281 1282class ScriptTestCase(unittest.TestCase): 1283 1284 def mock_server_class(self): 1285 return mock.MagicMock( 1286 return_value=mock.MagicMock( 1287 __enter__=mock.MagicMock( 1288 return_value=mock.MagicMock( 1289 socket=mock.MagicMock( 1290 getsockname=lambda: ('', 0), 1291 ), 1292 ), 1293 ), 1294 ), 1295 ) 1296 1297 @mock.patch('builtins.print') 1298 def test_server_test_unspec(self, _): 1299 mock_server = self.mock_server_class() 1300 server.test(ServerClass=mock_server, bind=None) 1301 self.assertIn( 1302 mock_server.address_family, 1303 (socket.AF_INET6, socket.AF_INET), 1304 ) 1305 1306 @mock.patch('builtins.print') 1307 def test_server_test_localhost(self, _): 1308 mock_server = self.mock_server_class() 1309 server.test(ServerClass=mock_server, bind="localhost") 1310 self.assertIn( 1311 mock_server.address_family, 1312 (socket.AF_INET6, socket.AF_INET), 1313 ) 1314 1315 ipv6_addrs = ( 1316 "::", 1317 "2001:0db8:85a3:0000:0000:8a2e:0370:7334", 1318 "::1", 1319 ) 1320 1321 ipv4_addrs = ( 1322 "0.0.0.0", 1323 "8.8.8.8", 1324 "127.0.0.1", 1325 ) 1326 1327 @mock.patch('builtins.print') 1328 def test_server_test_ipv6(self, _): 1329 for bind in self.ipv6_addrs: 1330 mock_server = self.mock_server_class() 1331 server.test(ServerClass=mock_server, bind=bind) 1332 self.assertEqual(mock_server.address_family, socket.AF_INET6) 1333 1334 @mock.patch('builtins.print') 1335 def test_server_test_ipv4(self, _): 1336 for bind in self.ipv4_addrs: 1337 mock_server = self.mock_server_class() 1338 server.test(ServerClass=mock_server, bind=bind) 1339 self.assertEqual(mock_server.address_family, socket.AF_INET) 1340 1341 1342def test_main(verbose=None): 1343 cwd = os.getcwd() 1344 try: 1345 support.run_unittest( 1346 RequestHandlerLoggingTestCase, 1347 BaseHTTPRequestHandlerTestCase, 1348 BaseHTTPServerTestCase, 1349 SimpleHTTPServerTestCase, 1350 CGIHTTPServerTestCase, 1351 SimpleHTTPRequestHandlerTestCase, 1352 MiscTestCase, 1353 ScriptTestCase 1354 ) 1355 finally: 1356 os.chdir(cwd) 1357 1358if __name__ == '__main__': 1359 test_main() 1360