1"""Unittests for the various HTTPServer modules. 2 3Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>, 4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest. 5""" 6from collections import OrderedDict 7from http.server import BaseHTTPRequestHandler, HTTPServer, \ 8 SimpleHTTPRequestHandler, CGIHTTPRequestHandler 9from http import server, HTTPStatus 10 11import os 12import socket 13import sys 14import re 15import base64 16import ntpath 17import pathlib 18import shutil 19import email.message 20import email.utils 21import html 22import http, http.client 23import urllib.parse 24import tempfile 25import time 26import datetime 27import threading 28from unittest import mock 29from io import BytesIO 30 31import unittest 32from test import support 33from test.support import os_helper 34from test.support import threading_helper 35 36 37class NoLogRequestHandler: 38 def log_message(self, *args): 39 # don't write log messages to stderr 40 pass 41 42 def read(self, n=None): 43 return '' 44 45 46class TestServerThread(threading.Thread): 47 def __init__(self, test_object, request_handler): 48 threading.Thread.__init__(self) 49 self.request_handler = request_handler 50 self.test_object = test_object 51 52 def run(self): 53 self.server = HTTPServer(('localhost', 0), self.request_handler) 54 self.test_object.HOST, self.test_object.PORT = self.server.socket.getsockname() 55 self.test_object.server_started.set() 56 self.test_object = None 57 try: 58 self.server.serve_forever(0.05) 59 finally: 60 self.server.server_close() 61 62 def stop(self): 63 self.server.shutdown() 64 self.join() 65 66 67class BaseTestCase(unittest.TestCase): 68 def setUp(self): 69 self._threads = threading_helper.threading_setup() 70 os.environ = os_helper.EnvironmentVarGuard() 71 self.server_started = threading.Event() 72 self.thread = TestServerThread(self, self.request_handler) 73 self.thread.start() 74 self.server_started.wait() 75 76 def tearDown(self): 77 self.thread.stop() 78 self.thread = None 79 os.environ.__exit__() 80 threading_helper.threading_cleanup(*self._threads) 81 82 def request(self, uri, method='GET', body=None, headers={}): 83 self.connection = http.client.HTTPConnection(self.HOST, self.PORT) 84 self.connection.request(method, uri, body, headers) 85 return self.connection.getresponse() 86 87 88class BaseHTTPServerTestCase(BaseTestCase): 89 class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): 90 protocol_version = 'HTTP/1.1' 91 default_request_version = 'HTTP/1.1' 92 93 def do_TEST(self): 94 self.send_response(HTTPStatus.NO_CONTENT) 95 self.send_header('Content-Type', 'text/html') 96 self.send_header('Connection', 'close') 97 self.end_headers() 98 99 def do_KEEP(self): 100 self.send_response(HTTPStatus.NO_CONTENT) 101 self.send_header('Content-Type', 'text/html') 102 self.send_header('Connection', 'keep-alive') 103 self.end_headers() 104 105 def do_KEYERROR(self): 106 self.send_error(999) 107 108 def do_NOTFOUND(self): 109 self.send_error(HTTPStatus.NOT_FOUND) 110 111 def do_EXPLAINERROR(self): 112 self.send_error(999, "Short Message", 113 "This is a long \n explanation") 114 115 def do_CUSTOM(self): 116 self.send_response(999) 117 self.send_header('Content-Type', 'text/html') 118 self.send_header('Connection', 'close') 119 self.end_headers() 120 121 def do_LATINONEHEADER(self): 122 self.send_response(999) 123 self.send_header('X-Special', 'Dängerous Mind') 124 self.send_header('Connection', 'close') 125 self.end_headers() 126 body = self.headers['x-special-incoming'].encode('utf-8') 127 self.wfile.write(body) 128 129 def do_SEND_ERROR(self): 130 self.send_error(int(self.path[1:])) 131 132 def do_HEAD(self): 133 self.send_error(int(self.path[1:])) 134 135 def setUp(self): 136 BaseTestCase.setUp(self) 137 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 138 self.con.connect() 139 140 def test_command(self): 141 self.con.request('GET', '/') 142 res = self.con.getresponse() 143 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 144 145 def test_request_line_trimming(self): 146 self.con._http_vsn_str = 'HTTP/1.1\n' 147 self.con.putrequest('XYZBOGUS', '/') 148 self.con.endheaders() 149 res = self.con.getresponse() 150 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 151 152 def test_version_bogus(self): 153 self.con._http_vsn_str = 'FUBAR' 154 self.con.putrequest('GET', '/') 155 self.con.endheaders() 156 res = self.con.getresponse() 157 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 158 159 def test_version_digits(self): 160 self.con._http_vsn_str = 'HTTP/9.9.9' 161 self.con.putrequest('GET', '/') 162 self.con.endheaders() 163 res = self.con.getresponse() 164 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 165 166 def test_version_none_get(self): 167 self.con._http_vsn_str = '' 168 self.con.putrequest('GET', '/') 169 self.con.endheaders() 170 res = self.con.getresponse() 171 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 172 173 def test_version_none(self): 174 # Test that a valid method is rejected when not HTTP/1.x 175 self.con._http_vsn_str = '' 176 self.con.putrequest('CUSTOM', '/') 177 self.con.endheaders() 178 res = self.con.getresponse() 179 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 180 181 def test_version_invalid(self): 182 self.con._http_vsn = 99 183 self.con._http_vsn_str = 'HTTP/9.9' 184 self.con.putrequest('GET', '/') 185 self.con.endheaders() 186 res = self.con.getresponse() 187 self.assertEqual(res.status, HTTPStatus.HTTP_VERSION_NOT_SUPPORTED) 188 189 def test_send_blank(self): 190 self.con._http_vsn_str = '' 191 self.con.putrequest('', '') 192 self.con.endheaders() 193 res = self.con.getresponse() 194 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 195 196 def test_header_close(self): 197 self.con.putrequest('GET', '/') 198 self.con.putheader('Connection', 'close') 199 self.con.endheaders() 200 res = self.con.getresponse() 201 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 202 203 def test_header_keep_alive(self): 204 self.con._http_vsn_str = 'HTTP/1.1' 205 self.con.putrequest('GET', '/') 206 self.con.putheader('Connection', 'keep-alive') 207 self.con.endheaders() 208 res = self.con.getresponse() 209 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 210 211 def test_handler(self): 212 self.con.request('TEST', '/') 213 res = self.con.getresponse() 214 self.assertEqual(res.status, HTTPStatus.NO_CONTENT) 215 216 def test_return_header_keep_alive(self): 217 self.con.request('KEEP', '/') 218 res = self.con.getresponse() 219 self.assertEqual(res.getheader('Connection'), 'keep-alive') 220 self.con.request('TEST', '/') 221 self.addCleanup(self.con.close) 222 223 def test_internal_key_error(self): 224 self.con.request('KEYERROR', '/') 225 res = self.con.getresponse() 226 self.assertEqual(res.status, 999) 227 228 def test_return_custom_status(self): 229 self.con.request('CUSTOM', '/') 230 res = self.con.getresponse() 231 self.assertEqual(res.status, 999) 232 233 def test_return_explain_error(self): 234 self.con.request('EXPLAINERROR', '/') 235 res = self.con.getresponse() 236 self.assertEqual(res.status, 999) 237 self.assertTrue(int(res.getheader('Content-Length'))) 238 239 def test_latin1_header(self): 240 self.con.request('LATINONEHEADER', '/', headers={ 241 'X-Special-Incoming': 'Ärger mit Unicode' 242 }) 243 res = self.con.getresponse() 244 self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind') 245 self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8')) 246 247 def test_error_content_length(self): 248 # Issue #16088: standard error responses should have a content-length 249 self.con.request('NOTFOUND', '/') 250 res = self.con.getresponse() 251 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 252 253 data = res.read() 254 self.assertEqual(int(res.getheader('Content-Length')), len(data)) 255 256 def test_send_error(self): 257 allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED, 258 HTTPStatus.RESET_CONTENT) 259 for code in (HTTPStatus.NO_CONTENT, HTTPStatus.NOT_MODIFIED, 260 HTTPStatus.PROCESSING, HTTPStatus.RESET_CONTENT, 261 HTTPStatus.SWITCHING_PROTOCOLS): 262 self.con.request('SEND_ERROR', '/{}'.format(code)) 263 res = self.con.getresponse() 264 self.assertEqual(code, res.status) 265 self.assertEqual(None, res.getheader('Content-Length')) 266 self.assertEqual(None, res.getheader('Content-Type')) 267 if code not in allow_transfer_encoding_codes: 268 self.assertEqual(None, res.getheader('Transfer-Encoding')) 269 270 data = res.read() 271 self.assertEqual(b'', data) 272 273 def test_head_via_send_error(self): 274 allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED, 275 HTTPStatus.RESET_CONTENT) 276 for code in (HTTPStatus.OK, HTTPStatus.NO_CONTENT, 277 HTTPStatus.NOT_MODIFIED, HTTPStatus.RESET_CONTENT, 278 HTTPStatus.SWITCHING_PROTOCOLS): 279 self.con.request('HEAD', '/{}'.format(code)) 280 res = self.con.getresponse() 281 self.assertEqual(code, res.status) 282 if code == HTTPStatus.OK: 283 self.assertTrue(int(res.getheader('Content-Length')) > 0) 284 self.assertIn('text/html', res.getheader('Content-Type')) 285 else: 286 self.assertEqual(None, res.getheader('Content-Length')) 287 self.assertEqual(None, res.getheader('Content-Type')) 288 if code not in allow_transfer_encoding_codes: 289 self.assertEqual(None, res.getheader('Transfer-Encoding')) 290 291 data = res.read() 292 self.assertEqual(b'', data) 293 294 295class RequestHandlerLoggingTestCase(BaseTestCase): 296 class request_handler(BaseHTTPRequestHandler): 297 protocol_version = 'HTTP/1.1' 298 default_request_version = 'HTTP/1.1' 299 300 def do_GET(self): 301 self.send_response(HTTPStatus.OK) 302 self.end_headers() 303 304 def do_ERROR(self): 305 self.send_error(HTTPStatus.NOT_FOUND, 'File not found') 306 307 def test_get(self): 308 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 309 self.con.connect() 310 311 with support.captured_stderr() as err: 312 self.con.request('GET', '/') 313 self.con.getresponse() 314 315 self.assertTrue( 316 err.getvalue().endswith('"GET / HTTP/1.1" 200 -\n')) 317 318 def test_err(self): 319 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 320 self.con.connect() 321 322 with support.captured_stderr() as err: 323 self.con.request('ERROR', '/') 324 self.con.getresponse() 325 326 lines = err.getvalue().split('\n') 327 self.assertTrue(lines[0].endswith('code 404, message File not found')) 328 self.assertTrue(lines[1].endswith('"ERROR / HTTP/1.1" 404 -')) 329 330 331class SimpleHTTPServerTestCase(BaseTestCase): 332 class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler): 333 pass 334 335 def setUp(self): 336 BaseTestCase.setUp(self) 337 self.cwd = os.getcwd() 338 basetempdir = tempfile.gettempdir() 339 os.chdir(basetempdir) 340 self.data = b'We are the knights who say Ni!' 341 self.tempdir = tempfile.mkdtemp(dir=basetempdir) 342 self.tempdir_name = os.path.basename(self.tempdir) 343 self.base_url = '/' + self.tempdir_name 344 tempname = os.path.join(self.tempdir, 'test') 345 with open(tempname, 'wb') as temp: 346 temp.write(self.data) 347 temp.flush() 348 mtime = os.stat(tempname).st_mtime 349 # compute last modification datetime for browser cache tests 350 last_modif = datetime.datetime.fromtimestamp(mtime, 351 datetime.timezone.utc) 352 self.last_modif_datetime = last_modif.replace(microsecond=0) 353 self.last_modif_header = email.utils.formatdate( 354 last_modif.timestamp(), usegmt=True) 355 356 def tearDown(self): 357 try: 358 os.chdir(self.cwd) 359 try: 360 shutil.rmtree(self.tempdir) 361 except: 362 pass 363 finally: 364 BaseTestCase.tearDown(self) 365 366 def check_status_and_reason(self, response, status, data=None): 367 def close_conn(): 368 """Don't close reader yet so we can check if there was leftover 369 buffered input""" 370 nonlocal reader 371 reader = response.fp 372 response.fp = None 373 reader = None 374 response._close_conn = close_conn 375 376 body = response.read() 377 self.assertTrue(response) 378 self.assertEqual(response.status, status) 379 self.assertIsNotNone(response.reason) 380 if data: 381 self.assertEqual(data, body) 382 # Ensure the server has not set up a persistent connection, and has 383 # not sent any extra data 384 self.assertEqual(response.version, 10) 385 self.assertEqual(response.msg.get("Connection", "close"), "close") 386 self.assertEqual(reader.read(30), b'', 'Connection should be closed') 387 388 reader.close() 389 return body 390 391 @unittest.skipIf(sys.platform == 'darwin', 392 'undecodable name cannot always be decoded on macOS') 393 @unittest.skipIf(sys.platform == 'win32', 394 'undecodable name cannot be decoded on win32') 395 @unittest.skipUnless(os_helper.TESTFN_UNDECODABLE, 396 'need os_helper.TESTFN_UNDECODABLE') 397 def test_undecodable_filename(self): 398 enc = sys.getfilesystemencoding() 399 filename = os.fsdecode(os_helper.TESTFN_UNDECODABLE) + '.txt' 400 with open(os.path.join(self.tempdir, filename), 'wb') as f: 401 f.write(os_helper.TESTFN_UNDECODABLE) 402 response = self.request(self.base_url + '/') 403 if sys.platform == 'darwin': 404 # On Mac OS the HFS+ filesystem replaces bytes that aren't valid 405 # UTF-8 into a percent-encoded value. 406 for name in os.listdir(self.tempdir): 407 if name != 'test': # Ignore a filename created in setUp(). 408 filename = name 409 break 410 body = self.check_status_and_reason(response, HTTPStatus.OK) 411 quotedname = urllib.parse.quote(filename, errors='surrogatepass') 412 self.assertIn(('href="%s"' % quotedname) 413 .encode(enc, 'surrogateescape'), body) 414 self.assertIn(('>%s<' % html.escape(filename, quote=False)) 415 .encode(enc, 'surrogateescape'), body) 416 response = self.request(self.base_url + '/' + quotedname) 417 self.check_status_and_reason(response, HTTPStatus.OK, 418 data=os_helper.TESTFN_UNDECODABLE) 419 420 def test_get(self): 421 #constructs the path relative to the root directory of the HTTPServer 422 response = self.request(self.base_url + '/test') 423 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) 424 # check for trailing "/" which should return 404. See Issue17324 425 response = self.request(self.base_url + '/test/') 426 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 427 response = self.request(self.base_url + '/') 428 self.check_status_and_reason(response, HTTPStatus.OK) 429 response = self.request(self.base_url) 430 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 431 self.assertEqual(response.getheader("Content-Length"), "0") 432 response = self.request(self.base_url + '/?hi=2') 433 self.check_status_and_reason(response, HTTPStatus.OK) 434 response = self.request(self.base_url + '?hi=1') 435 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 436 self.assertEqual(response.getheader("Location"), 437 self.base_url + "/?hi=1") 438 response = self.request('/ThisDoesNotExist') 439 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 440 response = self.request('/' + 'ThisDoesNotExist' + '/') 441 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 442 443 data = b"Dummy index file\r\n" 444 with open(os.path.join(self.tempdir_name, 'index.html'), 'wb') as f: 445 f.write(data) 446 response = self.request(self.base_url + '/') 447 self.check_status_and_reason(response, HTTPStatus.OK, data) 448 449 # chmod() doesn't work as expected on Windows, and filesystem 450 # permissions are ignored by root on Unix. 451 if os.name == 'posix' and os.geteuid() != 0: 452 os.chmod(self.tempdir, 0) 453 try: 454 response = self.request(self.base_url + '/') 455 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 456 finally: 457 os.chmod(self.tempdir, 0o755) 458 459 def test_head(self): 460 response = self.request( 461 self.base_url + '/test', method='HEAD') 462 self.check_status_and_reason(response, HTTPStatus.OK) 463 self.assertEqual(response.getheader('content-length'), 464 str(len(self.data))) 465 self.assertEqual(response.getheader('content-type'), 466 'application/octet-stream') 467 468 def test_browser_cache(self): 469 """Check that when a request to /test is sent with the request header 470 If-Modified-Since set to date of last modification, the server returns 471 status code 304, not 200 472 """ 473 headers = email.message.Message() 474 headers['If-Modified-Since'] = self.last_modif_header 475 response = self.request(self.base_url + '/test', headers=headers) 476 self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED) 477 478 # one hour after last modification : must return 304 479 new_dt = self.last_modif_datetime + datetime.timedelta(hours=1) 480 headers = email.message.Message() 481 headers['If-Modified-Since'] = email.utils.format_datetime(new_dt, 482 usegmt=True) 483 response = self.request(self.base_url + '/test', headers=headers) 484 self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED) 485 486 def test_browser_cache_file_changed(self): 487 # with If-Modified-Since earlier than Last-Modified, must return 200 488 dt = self.last_modif_datetime 489 # build datetime object : 365 days before last modification 490 old_dt = dt - datetime.timedelta(days=365) 491 headers = email.message.Message() 492 headers['If-Modified-Since'] = email.utils.format_datetime(old_dt, 493 usegmt=True) 494 response = self.request(self.base_url + '/test', headers=headers) 495 self.check_status_and_reason(response, HTTPStatus.OK) 496 497 def test_browser_cache_with_If_None_Match_header(self): 498 # if If-None-Match header is present, ignore If-Modified-Since 499 500 headers = email.message.Message() 501 headers['If-Modified-Since'] = self.last_modif_header 502 headers['If-None-Match'] = "*" 503 response = self.request(self.base_url + '/test', headers=headers) 504 self.check_status_and_reason(response, HTTPStatus.OK) 505 506 def test_invalid_requests(self): 507 response = self.request('/', method='FOO') 508 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 509 # requests must be case sensitive,so this should fail too 510 response = self.request('/', method='custom') 511 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 512 response = self.request('/', method='GETs') 513 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 514 515 def test_last_modified(self): 516 """Checks that the datetime returned in Last-Modified response header 517 is the actual datetime of last modification, rounded to the second 518 """ 519 response = self.request(self.base_url + '/test') 520 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) 521 last_modif_header = response.headers['Last-modified'] 522 self.assertEqual(last_modif_header, self.last_modif_header) 523 524 def test_path_without_leading_slash(self): 525 response = self.request(self.tempdir_name + '/test') 526 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) 527 response = self.request(self.tempdir_name + '/test/') 528 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 529 response = self.request(self.tempdir_name + '/') 530 self.check_status_and_reason(response, HTTPStatus.OK) 531 response = self.request(self.tempdir_name) 532 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 533 response = self.request(self.tempdir_name + '/?hi=2') 534 self.check_status_and_reason(response, HTTPStatus.OK) 535 response = self.request(self.tempdir_name + '?hi=1') 536 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 537 self.assertEqual(response.getheader("Location"), 538 self.tempdir_name + "/?hi=1") 539 540 def test_html_escape_filename(self): 541 filename = '<test&>.txt' 542 fullpath = os.path.join(self.tempdir, filename) 543 544 try: 545 open(fullpath, 'wb').close() 546 except OSError: 547 raise unittest.SkipTest('Can not create file %s on current file ' 548 'system' % filename) 549 550 try: 551 response = self.request(self.base_url + '/') 552 body = self.check_status_and_reason(response, HTTPStatus.OK) 553 enc = response.headers.get_content_charset() 554 finally: 555 os.unlink(fullpath) # avoid affecting test_undecodable_filename 556 557 self.assertIsNotNone(enc) 558 html_text = '>%s<' % html.escape(filename, quote=False) 559 self.assertIn(html_text.encode(enc), body) 560 561 562cgi_file1 = """\ 563#!%s 564 565print("Content-type: text/html") 566print() 567print("Hello World") 568""" 569 570cgi_file2 = """\ 571#!%s 572import cgi 573 574print("Content-type: text/html") 575print() 576 577form = cgi.FieldStorage() 578print("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"), 579 form.getfirst("bacon"))) 580""" 581 582cgi_file4 = """\ 583#!%s 584import os 585 586print("Content-type: text/html") 587print() 588 589print(os.environ["%s"]) 590""" 591 592cgi_file6 = """\ 593#!%s 594import os 595 596print("X-ambv: was here") 597print("Content-type: text/html") 598print() 599print("<pre>") 600for k, v in os.environ.items(): 601 try: 602 k.encode('ascii') 603 v.encode('ascii') 604 except UnicodeEncodeError: 605 continue # see: BPO-44647 606 print(f"{k}={v}") 607print("</pre>") 608""" 609 610 611@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, 612 "This test can't be run reliably as root (issue #13308).") 613class CGIHTTPServerTestCase(BaseTestCase): 614 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler): 615 pass 616 617 linesep = os.linesep.encode('ascii') 618 619 def setUp(self): 620 BaseTestCase.setUp(self) 621 self.cwd = os.getcwd() 622 self.parent_dir = tempfile.mkdtemp() 623 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin') 624 self.cgi_child_dir = os.path.join(self.cgi_dir, 'child-dir') 625 self.sub_dir_1 = os.path.join(self.parent_dir, 'sub') 626 self.sub_dir_2 = os.path.join(self.sub_dir_1, 'dir') 627 self.cgi_dir_in_sub_dir = os.path.join(self.sub_dir_2, 'cgi-bin') 628 os.mkdir(self.cgi_dir) 629 os.mkdir(self.cgi_child_dir) 630 os.mkdir(self.sub_dir_1) 631 os.mkdir(self.sub_dir_2) 632 os.mkdir(self.cgi_dir_in_sub_dir) 633 self.nocgi_path = None 634 self.file1_path = None 635 self.file2_path = None 636 self.file3_path = None 637 self.file4_path = None 638 self.file5_path = None 639 640 # The shebang line should be pure ASCII: use symlink if possible. 641 # See issue #7668. 642 self._pythonexe_symlink = None 643 if os_helper.can_symlink(): 644 self.pythonexe = os.path.join(self.parent_dir, 'python') 645 self._pythonexe_symlink = support.PythonSymlink(self.pythonexe).__enter__() 646 else: 647 self.pythonexe = sys.executable 648 649 try: 650 # The python executable path is written as the first line of the 651 # CGI Python script. The encoding cookie cannot be used, and so the 652 # path should be encodable to the default script encoding (utf-8) 653 self.pythonexe.encode('utf-8') 654 except UnicodeEncodeError: 655 self.tearDown() 656 self.skipTest("Python executable path is not encodable to utf-8") 657 658 self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py') 659 with open(self.nocgi_path, 'w', encoding='utf-8') as fp: 660 fp.write(cgi_file1 % self.pythonexe) 661 os.chmod(self.nocgi_path, 0o777) 662 663 self.file1_path = os.path.join(self.cgi_dir, 'file1.py') 664 with open(self.file1_path, 'w', encoding='utf-8') as file1: 665 file1.write(cgi_file1 % self.pythonexe) 666 os.chmod(self.file1_path, 0o777) 667 668 self.file2_path = os.path.join(self.cgi_dir, 'file2.py') 669 with open(self.file2_path, 'w', encoding='utf-8') as file2: 670 file2.write(cgi_file2 % self.pythonexe) 671 os.chmod(self.file2_path, 0o777) 672 673 self.file3_path = os.path.join(self.cgi_child_dir, 'file3.py') 674 with open(self.file3_path, 'w', encoding='utf-8') as file3: 675 file3.write(cgi_file1 % self.pythonexe) 676 os.chmod(self.file3_path, 0o777) 677 678 self.file4_path = os.path.join(self.cgi_dir, 'file4.py') 679 with open(self.file4_path, 'w', encoding='utf-8') as file4: 680 file4.write(cgi_file4 % (self.pythonexe, 'QUERY_STRING')) 681 os.chmod(self.file4_path, 0o777) 682 683 self.file5_path = os.path.join(self.cgi_dir_in_sub_dir, 'file5.py') 684 with open(self.file5_path, 'w', encoding='utf-8') as file5: 685 file5.write(cgi_file1 % self.pythonexe) 686 os.chmod(self.file5_path, 0o777) 687 688 self.file6_path = os.path.join(self.cgi_dir, 'file6.py') 689 with open(self.file6_path, 'w', encoding='utf-8') as file6: 690 file6.write(cgi_file6 % self.pythonexe) 691 os.chmod(self.file6_path, 0o777) 692 693 os.chdir(self.parent_dir) 694 695 def tearDown(self): 696 try: 697 os.chdir(self.cwd) 698 if self._pythonexe_symlink: 699 self._pythonexe_symlink.__exit__(None, None, None) 700 if self.nocgi_path: 701 os.remove(self.nocgi_path) 702 if self.file1_path: 703 os.remove(self.file1_path) 704 if self.file2_path: 705 os.remove(self.file2_path) 706 if self.file3_path: 707 os.remove(self.file3_path) 708 if self.file4_path: 709 os.remove(self.file4_path) 710 if self.file5_path: 711 os.remove(self.file5_path) 712 if self.file6_path: 713 os.remove(self.file6_path) 714 os.rmdir(self.cgi_child_dir) 715 os.rmdir(self.cgi_dir) 716 os.rmdir(self.cgi_dir_in_sub_dir) 717 os.rmdir(self.sub_dir_2) 718 os.rmdir(self.sub_dir_1) 719 os.rmdir(self.parent_dir) 720 finally: 721 BaseTestCase.tearDown(self) 722 723 def test_url_collapse_path(self): 724 # verify tail is the last portion and head is the rest on proper urls 725 test_vectors = { 726 '': '//', 727 '..': IndexError, 728 '/.//..': IndexError, 729 '/': '//', 730 '//': '//', 731 '/\\': '//\\', 732 '/.//': '//', 733 'cgi-bin/file1.py': '/cgi-bin/file1.py', 734 '/cgi-bin/file1.py': '/cgi-bin/file1.py', 735 'a': '//a', 736 '/a': '//a', 737 '//a': '//a', 738 './a': '//a', 739 './C:/': '/C:/', 740 '/a/b': '/a/b', 741 '/a/b/': '/a/b/', 742 '/a/b/.': '/a/b/', 743 '/a/b/c/..': '/a/b/', 744 '/a/b/c/../d': '/a/b/d', 745 '/a/b/c/../d/e/../f': '/a/b/d/f', 746 '/a/b/c/../d/e/../../f': '/a/b/f', 747 '/a/b/c/../d/e/.././././..//f': '/a/b/f', 748 '../a/b/c/../d/e/.././././..//f': IndexError, 749 '/a/b/c/../d/e/../../../f': '/a/f', 750 '/a/b/c/../d/e/../../../../f': '//f', 751 '/a/b/c/../d/e/../../../../../f': IndexError, 752 '/a/b/c/../d/e/../../../../f/..': '//', 753 '/a/b/c/../d/e/../../../../f/../.': '//', 754 } 755 for path, expected in test_vectors.items(): 756 if isinstance(expected, type) and issubclass(expected, Exception): 757 self.assertRaises(expected, 758 server._url_collapse_path, path) 759 else: 760 actual = server._url_collapse_path(path) 761 self.assertEqual(expected, actual, 762 msg='path = %r\nGot: %r\nWanted: %r' % 763 (path, actual, expected)) 764 765 def test_headers_and_content(self): 766 res = self.request('/cgi-bin/file1.py') 767 self.assertEqual( 768 (res.read(), res.getheader('Content-type'), res.status), 769 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK)) 770 771 def test_issue19435(self): 772 res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh') 773 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 774 775 def test_post(self): 776 params = urllib.parse.urlencode( 777 {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456}) 778 headers = {'Content-type' : 'application/x-www-form-urlencoded'} 779 res = self.request('/cgi-bin/file2.py', 'POST', params, headers) 780 781 self.assertEqual(res.read(), b'1, python, 123456' + self.linesep) 782 783 def test_invaliduri(self): 784 res = self.request('/cgi-bin/invalid') 785 res.read() 786 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 787 788 def test_authorization(self): 789 headers = {b'Authorization' : b'Basic ' + 790 base64.b64encode(b'username:pass')} 791 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers) 792 self.assertEqual( 793 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 794 (res.read(), res.getheader('Content-type'), res.status)) 795 796 def test_no_leading_slash(self): 797 # http://bugs.python.org/issue2254 798 res = self.request('cgi-bin/file1.py') 799 self.assertEqual( 800 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 801 (res.read(), res.getheader('Content-type'), res.status)) 802 803 def test_os_environ_is_not_altered(self): 804 signature = "Test CGI Server" 805 os.environ['SERVER_SOFTWARE'] = signature 806 res = self.request('/cgi-bin/file1.py') 807 self.assertEqual( 808 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 809 (res.read(), res.getheader('Content-type'), res.status)) 810 self.assertEqual(os.environ['SERVER_SOFTWARE'], signature) 811 812 def test_urlquote_decoding_in_cgi_check(self): 813 res = self.request('/cgi-bin%2ffile1.py') 814 self.assertEqual( 815 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 816 (res.read(), res.getheader('Content-type'), res.status)) 817 818 def test_nested_cgi_path_issue21323(self): 819 res = self.request('/cgi-bin/child-dir/file3.py') 820 self.assertEqual( 821 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 822 (res.read(), res.getheader('Content-type'), res.status)) 823 824 def test_query_with_multiple_question_mark(self): 825 res = self.request('/cgi-bin/file4.py?a=b?c=d') 826 self.assertEqual( 827 (b'a=b?c=d' + self.linesep, 'text/html', HTTPStatus.OK), 828 (res.read(), res.getheader('Content-type'), res.status)) 829 830 def test_query_with_continuous_slashes(self): 831 res = self.request('/cgi-bin/file4.py?k=aa%2F%2Fbb&//q//p//=//a//b//') 832 self.assertEqual( 833 (b'k=aa%2F%2Fbb&//q//p//=//a//b//' + self.linesep, 834 'text/html', HTTPStatus.OK), 835 (res.read(), res.getheader('Content-type'), res.status)) 836 837 def test_cgi_path_in_sub_directories(self): 838 try: 839 CGIHTTPRequestHandler.cgi_directories.append('/sub/dir/cgi-bin') 840 res = self.request('/sub/dir/cgi-bin/file5.py') 841 self.assertEqual( 842 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 843 (res.read(), res.getheader('Content-type'), res.status)) 844 finally: 845 CGIHTTPRequestHandler.cgi_directories.remove('/sub/dir/cgi-bin') 846 847 def test_accept(self): 848 browser_accept = \ 849 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8' 850 tests = ( 851 ((('Accept', browser_accept),), browser_accept), 852 ((), ''), 853 # Hack case to get two values for the one header 854 ((('Accept', 'text/html'), ('ACCEPT', 'text/plain')), 855 'text/html,text/plain'), 856 ) 857 for headers, expected in tests: 858 headers = OrderedDict(headers) 859 with self.subTest(headers): 860 res = self.request('/cgi-bin/file6.py', 'GET', headers=headers) 861 self.assertEqual(http.HTTPStatus.OK, res.status) 862 expected = f"HTTP_ACCEPT={expected}".encode('ascii') 863 self.assertIn(expected, res.read()) 864 865 866class SocketlessRequestHandler(SimpleHTTPRequestHandler): 867 def __init__(self, directory=None): 868 request = mock.Mock() 869 request.makefile.return_value = BytesIO() 870 super().__init__(request, None, None, directory=directory) 871 872 self.get_called = False 873 self.protocol_version = "HTTP/1.1" 874 875 def do_GET(self): 876 self.get_called = True 877 self.send_response(HTTPStatus.OK) 878 self.send_header('Content-Type', 'text/html') 879 self.end_headers() 880 self.wfile.write(b'<html><body>Data</body></html>\r\n') 881 882 def log_message(self, format, *args): 883 pass 884 885class RejectingSocketlessRequestHandler(SocketlessRequestHandler): 886 def handle_expect_100(self): 887 self.send_error(HTTPStatus.EXPECTATION_FAILED) 888 return False 889 890 891class AuditableBytesIO: 892 893 def __init__(self): 894 self.datas = [] 895 896 def write(self, data): 897 self.datas.append(data) 898 899 def getData(self): 900 return b''.join(self.datas) 901 902 @property 903 def numWrites(self): 904 return len(self.datas) 905 906 907class BaseHTTPRequestHandlerTestCase(unittest.TestCase): 908 """Test the functionality of the BaseHTTPServer. 909 910 Test the support for the Expect 100-continue header. 911 """ 912 913 HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK') 914 915 def setUp (self): 916 self.handler = SocketlessRequestHandler() 917 918 def send_typical_request(self, message): 919 input = BytesIO(message) 920 output = BytesIO() 921 self.handler.rfile = input 922 self.handler.wfile = output 923 self.handler.handle_one_request() 924 output.seek(0) 925 return output.readlines() 926 927 def verify_get_called(self): 928 self.assertTrue(self.handler.get_called) 929 930 def verify_expected_headers(self, headers): 931 for fieldName in b'Server: ', b'Date: ', b'Content-Type: ': 932 self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1) 933 934 def verify_http_server_response(self, response): 935 match = self.HTTPResponseMatch.search(response) 936 self.assertIsNotNone(match) 937 938 def test_http_1_1(self): 939 result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n') 940 self.verify_http_server_response(result[0]) 941 self.verify_expected_headers(result[1:-1]) 942 self.verify_get_called() 943 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 944 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 945 self.assertEqual(self.handler.command, 'GET') 946 self.assertEqual(self.handler.path, '/') 947 self.assertEqual(self.handler.request_version, 'HTTP/1.1') 948 self.assertSequenceEqual(self.handler.headers.items(), ()) 949 950 def test_http_1_0(self): 951 result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n') 952 self.verify_http_server_response(result[0]) 953 self.verify_expected_headers(result[1:-1]) 954 self.verify_get_called() 955 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 956 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0') 957 self.assertEqual(self.handler.command, 'GET') 958 self.assertEqual(self.handler.path, '/') 959 self.assertEqual(self.handler.request_version, 'HTTP/1.0') 960 self.assertSequenceEqual(self.handler.headers.items(), ()) 961 962 def test_http_0_9(self): 963 result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n') 964 self.assertEqual(len(result), 1) 965 self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n') 966 self.verify_get_called() 967 968 def test_extra_space(self): 969 result = self.send_typical_request( 970 b'GET /spaced out HTTP/1.1\r\n' 971 b'Host: dummy\r\n' 972 b'\r\n' 973 ) 974 self.assertTrue(result[0].startswith(b'HTTP/1.1 400 ')) 975 self.verify_expected_headers(result[1:result.index(b'\r\n')]) 976 self.assertFalse(self.handler.get_called) 977 978 def test_with_continue_1_0(self): 979 result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n') 980 self.verify_http_server_response(result[0]) 981 self.verify_expected_headers(result[1:-1]) 982 self.verify_get_called() 983 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 984 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0') 985 self.assertEqual(self.handler.command, 'GET') 986 self.assertEqual(self.handler.path, '/') 987 self.assertEqual(self.handler.request_version, 'HTTP/1.0') 988 headers = (("Expect", "100-continue"),) 989 self.assertSequenceEqual(self.handler.headers.items(), headers) 990 991 def test_with_continue_1_1(self): 992 result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 993 self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n') 994 self.assertEqual(result[1], b'\r\n') 995 self.assertEqual(result[2], b'HTTP/1.1 200 OK\r\n') 996 self.verify_expected_headers(result[2:-1]) 997 self.verify_get_called() 998 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 999 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 1000 self.assertEqual(self.handler.command, 'GET') 1001 self.assertEqual(self.handler.path, '/') 1002 self.assertEqual(self.handler.request_version, 'HTTP/1.1') 1003 headers = (("Expect", "100-continue"),) 1004 self.assertSequenceEqual(self.handler.headers.items(), headers) 1005 1006 def test_header_buffering_of_send_error(self): 1007 1008 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 1009 output = AuditableBytesIO() 1010 handler = SocketlessRequestHandler() 1011 handler.rfile = input 1012 handler.wfile = output 1013 handler.request_version = 'HTTP/1.1' 1014 handler.requestline = '' 1015 handler.command = None 1016 1017 handler.send_error(418) 1018 self.assertEqual(output.numWrites, 2) 1019 1020 def test_header_buffering_of_send_response_only(self): 1021 1022 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 1023 output = AuditableBytesIO() 1024 handler = SocketlessRequestHandler() 1025 handler.rfile = input 1026 handler.wfile = output 1027 handler.request_version = 'HTTP/1.1' 1028 1029 handler.send_response_only(418) 1030 self.assertEqual(output.numWrites, 0) 1031 handler.end_headers() 1032 self.assertEqual(output.numWrites, 1) 1033 1034 def test_header_buffering_of_send_header(self): 1035 1036 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 1037 output = AuditableBytesIO() 1038 handler = SocketlessRequestHandler() 1039 handler.rfile = input 1040 handler.wfile = output 1041 handler.request_version = 'HTTP/1.1' 1042 1043 handler.send_header('Foo', 'foo') 1044 handler.send_header('bar', 'bar') 1045 self.assertEqual(output.numWrites, 0) 1046 handler.end_headers() 1047 self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n') 1048 self.assertEqual(output.numWrites, 1) 1049 1050 def test_header_unbuffered_when_continue(self): 1051 1052 def _readAndReseek(f): 1053 pos = f.tell() 1054 f.seek(0) 1055 data = f.read() 1056 f.seek(pos) 1057 return data 1058 1059 input = BytesIO(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 1060 output = BytesIO() 1061 self.handler.rfile = input 1062 self.handler.wfile = output 1063 self.handler.request_version = 'HTTP/1.1' 1064 1065 self.handler.handle_one_request() 1066 self.assertNotEqual(_readAndReseek(output), b'') 1067 result = _readAndReseek(output).split(b'\r\n') 1068 self.assertEqual(result[0], b'HTTP/1.1 100 Continue') 1069 self.assertEqual(result[1], b'') 1070 self.assertEqual(result[2], b'HTTP/1.1 200 OK') 1071 1072 def test_with_continue_rejected(self): 1073 usual_handler = self.handler # Save to avoid breaking any subsequent tests. 1074 self.handler = RejectingSocketlessRequestHandler() 1075 result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 1076 self.assertEqual(result[0], b'HTTP/1.1 417 Expectation Failed\r\n') 1077 self.verify_expected_headers(result[1:-1]) 1078 # The expect handler should short circuit the usual get method by 1079 # returning false here, so get_called should be false 1080 self.assertFalse(self.handler.get_called) 1081 self.assertEqual(sum(r == b'Connection: close\r\n' for r in result[1:-1]), 1) 1082 self.handler = usual_handler # Restore to avoid breaking any subsequent tests. 1083 1084 def test_request_length(self): 1085 # Issue #10714: huge request lines are discarded, to avoid Denial 1086 # of Service attacks. 1087 result = self.send_typical_request(b'GET ' + b'x' * 65537) 1088 self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n') 1089 self.assertFalse(self.handler.get_called) 1090 self.assertIsInstance(self.handler.requestline, str) 1091 1092 def test_header_length(self): 1093 # Issue #6791: same for headers 1094 result = self.send_typical_request( 1095 b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n') 1096 self.assertEqual(result[0], b'HTTP/1.1 431 Line too long\r\n') 1097 self.assertFalse(self.handler.get_called) 1098 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 1099 1100 def test_too_many_headers(self): 1101 result = self.send_typical_request( 1102 b'GET / HTTP/1.1\r\n' + b'X-Foo: bar\r\n' * 101 + b'\r\n') 1103 self.assertEqual(result[0], b'HTTP/1.1 431 Too many headers\r\n') 1104 self.assertFalse(self.handler.get_called) 1105 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 1106 1107 def test_html_escape_on_error(self): 1108 result = self.send_typical_request( 1109 b'<script>alert("hello")</script> / HTTP/1.1') 1110 result = b''.join(result) 1111 text = '<script>alert("hello")</script>' 1112 self.assertIn(html.escape(text, quote=False).encode('ascii'), result) 1113 1114 def test_close_connection(self): 1115 # handle_one_request() should be repeatedly called until 1116 # it sets close_connection 1117 def handle_one_request(): 1118 self.handler.close_connection = next(close_values) 1119 self.handler.handle_one_request = handle_one_request 1120 1121 close_values = iter((True,)) 1122 self.handler.handle() 1123 self.assertRaises(StopIteration, next, close_values) 1124 1125 close_values = iter((False, False, True)) 1126 self.handler.handle() 1127 self.assertRaises(StopIteration, next, close_values) 1128 1129 def test_date_time_string(self): 1130 now = time.time() 1131 # this is the old code that formats the timestamp 1132 year, month, day, hh, mm, ss, wd, y, z = time.gmtime(now) 1133 expected = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % ( 1134 self.handler.weekdayname[wd], 1135 day, 1136 self.handler.monthname[month], 1137 year, hh, mm, ss 1138 ) 1139 self.assertEqual(self.handler.date_time_string(timestamp=now), expected) 1140 1141 1142class SimpleHTTPRequestHandlerTestCase(unittest.TestCase): 1143 """ Test url parsing """ 1144 def setUp(self): 1145 self.translated_1 = os.path.join(os.getcwd(), 'filename') 1146 self.translated_2 = os.path.join('foo', 'filename') 1147 self.translated_3 = os.path.join('bar', 'filename') 1148 self.handler_1 = SocketlessRequestHandler() 1149 self.handler_2 = SocketlessRequestHandler(directory='foo') 1150 self.handler_3 = SocketlessRequestHandler(directory=pathlib.PurePath('bar')) 1151 1152 def test_query_arguments(self): 1153 path = self.handler_1.translate_path('/filename') 1154 self.assertEqual(path, self.translated_1) 1155 path = self.handler_2.translate_path('/filename') 1156 self.assertEqual(path, self.translated_2) 1157 path = self.handler_3.translate_path('/filename') 1158 self.assertEqual(path, self.translated_3) 1159 1160 path = self.handler_1.translate_path('/filename?foo=bar') 1161 self.assertEqual(path, self.translated_1) 1162 path = self.handler_2.translate_path('/filename?foo=bar') 1163 self.assertEqual(path, self.translated_2) 1164 path = self.handler_3.translate_path('/filename?foo=bar') 1165 self.assertEqual(path, self.translated_3) 1166 1167 path = self.handler_1.translate_path('/filename?a=b&spam=eggs#zot') 1168 self.assertEqual(path, self.translated_1) 1169 path = self.handler_2.translate_path('/filename?a=b&spam=eggs#zot') 1170 self.assertEqual(path, self.translated_2) 1171 path = self.handler_3.translate_path('/filename?a=b&spam=eggs#zot') 1172 self.assertEqual(path, self.translated_3) 1173 1174 def test_start_with_double_slash(self): 1175 path = self.handler_1.translate_path('//filename') 1176 self.assertEqual(path, self.translated_1) 1177 path = self.handler_2.translate_path('//filename') 1178 self.assertEqual(path, self.translated_2) 1179 path = self.handler_3.translate_path('//filename') 1180 self.assertEqual(path, self.translated_3) 1181 1182 path = self.handler_1.translate_path('//filename?foo=bar') 1183 self.assertEqual(path, self.translated_1) 1184 path = self.handler_2.translate_path('//filename?foo=bar') 1185 self.assertEqual(path, self.translated_2) 1186 path = self.handler_3.translate_path('//filename?foo=bar') 1187 self.assertEqual(path, self.translated_3) 1188 1189 def test_windows_colon(self): 1190 with support.swap_attr(server.os, 'path', ntpath): 1191 path = self.handler_1.translate_path('c:c:c:foo/filename') 1192 path = path.replace(ntpath.sep, os.sep) 1193 self.assertEqual(path, self.translated_1) 1194 path = self.handler_2.translate_path('c:c:c:foo/filename') 1195 path = path.replace(ntpath.sep, os.sep) 1196 self.assertEqual(path, self.translated_2) 1197 path = self.handler_3.translate_path('c:c:c:foo/filename') 1198 path = path.replace(ntpath.sep, os.sep) 1199 self.assertEqual(path, self.translated_3) 1200 1201 path = self.handler_1.translate_path('\\c:../filename') 1202 path = path.replace(ntpath.sep, os.sep) 1203 self.assertEqual(path, self.translated_1) 1204 path = self.handler_2.translate_path('\\c:../filename') 1205 path = path.replace(ntpath.sep, os.sep) 1206 self.assertEqual(path, self.translated_2) 1207 path = self.handler_3.translate_path('\\c:../filename') 1208 path = path.replace(ntpath.sep, os.sep) 1209 self.assertEqual(path, self.translated_3) 1210 1211 path = self.handler_1.translate_path('c:\\c:..\\foo/filename') 1212 path = path.replace(ntpath.sep, os.sep) 1213 self.assertEqual(path, self.translated_1) 1214 path = self.handler_2.translate_path('c:\\c:..\\foo/filename') 1215 path = path.replace(ntpath.sep, os.sep) 1216 self.assertEqual(path, self.translated_2) 1217 path = self.handler_3.translate_path('c:\\c:..\\foo/filename') 1218 path = path.replace(ntpath.sep, os.sep) 1219 self.assertEqual(path, self.translated_3) 1220 1221 path = self.handler_1.translate_path('c:c:foo\\c:c:bar/filename') 1222 path = path.replace(ntpath.sep, os.sep) 1223 self.assertEqual(path, self.translated_1) 1224 path = self.handler_2.translate_path('c:c:foo\\c:c:bar/filename') 1225 path = path.replace(ntpath.sep, os.sep) 1226 self.assertEqual(path, self.translated_2) 1227 path = self.handler_3.translate_path('c:c:foo\\c:c:bar/filename') 1228 path = path.replace(ntpath.sep, os.sep) 1229 self.assertEqual(path, self.translated_3) 1230 1231 1232class MiscTestCase(unittest.TestCase): 1233 def test_all(self): 1234 expected = [] 1235 denylist = {'executable', 'nobody_uid', 'test'} 1236 for name in dir(server): 1237 if name.startswith('_') or name in denylist: 1238 continue 1239 module_object = getattr(server, name) 1240 if getattr(module_object, '__module__', None) == 'http.server': 1241 expected.append(name) 1242 self.assertCountEqual(server.__all__, expected) 1243 1244 1245class ScriptTestCase(unittest.TestCase): 1246 1247 def mock_server_class(self): 1248 return mock.MagicMock( 1249 return_value=mock.MagicMock( 1250 __enter__=mock.MagicMock( 1251 return_value=mock.MagicMock( 1252 socket=mock.MagicMock( 1253 getsockname=lambda: ('', 0), 1254 ), 1255 ), 1256 ), 1257 ), 1258 ) 1259 1260 @mock.patch('builtins.print') 1261 def test_server_test_unspec(self, _): 1262 mock_server = self.mock_server_class() 1263 server.test(ServerClass=mock_server, bind=None) 1264 self.assertIn( 1265 mock_server.address_family, 1266 (socket.AF_INET6, socket.AF_INET), 1267 ) 1268 1269 @mock.patch('builtins.print') 1270 def test_server_test_localhost(self, _): 1271 mock_server = self.mock_server_class() 1272 server.test(ServerClass=mock_server, bind="localhost") 1273 self.assertIn( 1274 mock_server.address_family, 1275 (socket.AF_INET6, socket.AF_INET), 1276 ) 1277 1278 ipv6_addrs = ( 1279 "::", 1280 "2001:0db8:85a3:0000:0000:8a2e:0370:7334", 1281 "::1", 1282 ) 1283 1284 ipv4_addrs = ( 1285 "0.0.0.0", 1286 "8.8.8.8", 1287 "127.0.0.1", 1288 ) 1289 1290 @mock.patch('builtins.print') 1291 def test_server_test_ipv6(self, _): 1292 for bind in self.ipv6_addrs: 1293 mock_server = self.mock_server_class() 1294 server.test(ServerClass=mock_server, bind=bind) 1295 self.assertEqual(mock_server.address_family, socket.AF_INET6) 1296 1297 @mock.patch('builtins.print') 1298 def test_server_test_ipv4(self, _): 1299 for bind in self.ipv4_addrs: 1300 mock_server = self.mock_server_class() 1301 server.test(ServerClass=mock_server, bind=bind) 1302 self.assertEqual(mock_server.address_family, socket.AF_INET) 1303 1304 1305def setUpModule(): 1306 unittest.addModuleCleanup(os.chdir, os.getcwd()) 1307 1308 1309if __name__ == '__main__': 1310 unittest.main() 1311