1"""Unittests for the various HTTPServer modules. 2 3Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>, 4Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest. 5""" 6 7from http.server import BaseHTTPRequestHandler, HTTPServer, \ 8 SimpleHTTPRequestHandler, CGIHTTPRequestHandler 9from http import server, HTTPStatus 10 11import os 12import socket 13import sys 14import re 15import base64 16import ntpath 17import shutil 18import email.message 19import email.utils 20import html 21import http.client 22import urllib.parse 23import tempfile 24import time 25import datetime 26import threading 27from unittest import mock 28from io import BytesIO 29 30import unittest 31from test import support 32 33 34class NoLogRequestHandler: 35 def log_message(self, *args): 36 # don't write log messages to stderr 37 pass 38 39 def read(self, n=None): 40 return '' 41 42 43class TestServerThread(threading.Thread): 44 def __init__(self, test_object, request_handler): 45 threading.Thread.__init__(self) 46 self.request_handler = request_handler 47 self.test_object = test_object 48 49 def run(self): 50 self.server = HTTPServer(('localhost', 0), self.request_handler) 51 self.test_object.HOST, self.test_object.PORT = self.server.socket.getsockname() 52 self.test_object.server_started.set() 53 self.test_object = None 54 try: 55 self.server.serve_forever(0.05) 56 finally: 57 self.server.server_close() 58 59 def stop(self): 60 self.server.shutdown() 61 self.join() 62 63 64class BaseTestCase(unittest.TestCase): 65 def setUp(self): 66 self._threads = support.threading_setup() 67 os.environ = support.EnvironmentVarGuard() 68 self.server_started = threading.Event() 69 self.thread = TestServerThread(self, self.request_handler) 70 self.thread.start() 71 self.server_started.wait() 72 73 def tearDown(self): 74 self.thread.stop() 75 self.thread = None 76 os.environ.__exit__() 77 support.threading_cleanup(*self._threads) 78 79 def request(self, uri, method='GET', body=None, headers={}): 80 self.connection = http.client.HTTPConnection(self.HOST, self.PORT) 81 self.connection.request(method, uri, body, headers) 82 return self.connection.getresponse() 83 84 85class BaseHTTPServerTestCase(BaseTestCase): 86 class request_handler(NoLogRequestHandler, BaseHTTPRequestHandler): 87 protocol_version = 'HTTP/1.1' 88 default_request_version = 'HTTP/1.1' 89 90 def do_TEST(self): 91 self.send_response(HTTPStatus.NO_CONTENT) 92 self.send_header('Content-Type', 'text/html') 93 self.send_header('Connection', 'close') 94 self.end_headers() 95 96 def do_KEEP(self): 97 self.send_response(HTTPStatus.NO_CONTENT) 98 self.send_header('Content-Type', 'text/html') 99 self.send_header('Connection', 'keep-alive') 100 self.end_headers() 101 102 def do_KEYERROR(self): 103 self.send_error(999) 104 105 def do_NOTFOUND(self): 106 self.send_error(HTTPStatus.NOT_FOUND) 107 108 def do_EXPLAINERROR(self): 109 self.send_error(999, "Short Message", 110 "This is a long \n explanation") 111 112 def do_CUSTOM(self): 113 self.send_response(999) 114 self.send_header('Content-Type', 'text/html') 115 self.send_header('Connection', 'close') 116 self.end_headers() 117 118 def do_LATINONEHEADER(self): 119 self.send_response(999) 120 self.send_header('X-Special', 'Dängerous Mind') 121 self.send_header('Connection', 'close') 122 self.end_headers() 123 body = self.headers['x-special-incoming'].encode('utf-8') 124 self.wfile.write(body) 125 126 def do_SEND_ERROR(self): 127 self.send_error(int(self.path[1:])) 128 129 def do_HEAD(self): 130 self.send_error(int(self.path[1:])) 131 132 def setUp(self): 133 BaseTestCase.setUp(self) 134 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 135 self.con.connect() 136 137 def test_command(self): 138 self.con.request('GET', '/') 139 res = self.con.getresponse() 140 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 141 142 def test_request_line_trimming(self): 143 self.con._http_vsn_str = 'HTTP/1.1\n' 144 self.con.putrequest('XYZBOGUS', '/') 145 self.con.endheaders() 146 res = self.con.getresponse() 147 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 148 149 def test_version_bogus(self): 150 self.con._http_vsn_str = 'FUBAR' 151 self.con.putrequest('GET', '/') 152 self.con.endheaders() 153 res = self.con.getresponse() 154 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 155 156 def test_version_digits(self): 157 self.con._http_vsn_str = 'HTTP/9.9.9' 158 self.con.putrequest('GET', '/') 159 self.con.endheaders() 160 res = self.con.getresponse() 161 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 162 163 def test_version_none_get(self): 164 self.con._http_vsn_str = '' 165 self.con.putrequest('GET', '/') 166 self.con.endheaders() 167 res = self.con.getresponse() 168 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 169 170 def test_version_none(self): 171 # Test that a valid method is rejected when not HTTP/1.x 172 self.con._http_vsn_str = '' 173 self.con.putrequest('CUSTOM', '/') 174 self.con.endheaders() 175 res = self.con.getresponse() 176 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 177 178 def test_version_invalid(self): 179 self.con._http_vsn = 99 180 self.con._http_vsn_str = 'HTTP/9.9' 181 self.con.putrequest('GET', '/') 182 self.con.endheaders() 183 res = self.con.getresponse() 184 self.assertEqual(res.status, HTTPStatus.HTTP_VERSION_NOT_SUPPORTED) 185 186 def test_send_blank(self): 187 self.con._http_vsn_str = '' 188 self.con.putrequest('', '') 189 self.con.endheaders() 190 res = self.con.getresponse() 191 self.assertEqual(res.status, HTTPStatus.BAD_REQUEST) 192 193 def test_header_close(self): 194 self.con.putrequest('GET', '/') 195 self.con.putheader('Connection', 'close') 196 self.con.endheaders() 197 res = self.con.getresponse() 198 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 199 200 def test_header_keep_alive(self): 201 self.con._http_vsn_str = 'HTTP/1.1' 202 self.con.putrequest('GET', '/') 203 self.con.putheader('Connection', 'keep-alive') 204 self.con.endheaders() 205 res = self.con.getresponse() 206 self.assertEqual(res.status, HTTPStatus.NOT_IMPLEMENTED) 207 208 def test_handler(self): 209 self.con.request('TEST', '/') 210 res = self.con.getresponse() 211 self.assertEqual(res.status, HTTPStatus.NO_CONTENT) 212 213 def test_return_header_keep_alive(self): 214 self.con.request('KEEP', '/') 215 res = self.con.getresponse() 216 self.assertEqual(res.getheader('Connection'), 'keep-alive') 217 self.con.request('TEST', '/') 218 self.addCleanup(self.con.close) 219 220 def test_internal_key_error(self): 221 self.con.request('KEYERROR', '/') 222 res = self.con.getresponse() 223 self.assertEqual(res.status, 999) 224 225 def test_return_custom_status(self): 226 self.con.request('CUSTOM', '/') 227 res = self.con.getresponse() 228 self.assertEqual(res.status, 999) 229 230 def test_return_explain_error(self): 231 self.con.request('EXPLAINERROR', '/') 232 res = self.con.getresponse() 233 self.assertEqual(res.status, 999) 234 self.assertTrue(int(res.getheader('Content-Length'))) 235 236 def test_latin1_header(self): 237 self.con.request('LATINONEHEADER', '/', headers={ 238 'X-Special-Incoming': 'Ärger mit Unicode' 239 }) 240 res = self.con.getresponse() 241 self.assertEqual(res.getheader('X-Special'), 'Dängerous Mind') 242 self.assertEqual(res.read(), 'Ärger mit Unicode'.encode('utf-8')) 243 244 def test_error_content_length(self): 245 # Issue #16088: standard error responses should have a content-length 246 self.con.request('NOTFOUND', '/') 247 res = self.con.getresponse() 248 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 249 250 data = res.read() 251 self.assertEqual(int(res.getheader('Content-Length')), len(data)) 252 253 def test_send_error(self): 254 allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED, 255 HTTPStatus.RESET_CONTENT) 256 for code in (HTTPStatus.NO_CONTENT, HTTPStatus.NOT_MODIFIED, 257 HTTPStatus.PROCESSING, HTTPStatus.RESET_CONTENT, 258 HTTPStatus.SWITCHING_PROTOCOLS): 259 self.con.request('SEND_ERROR', '/{}'.format(code)) 260 res = self.con.getresponse() 261 self.assertEqual(code, res.status) 262 self.assertEqual(None, res.getheader('Content-Length')) 263 self.assertEqual(None, res.getheader('Content-Type')) 264 if code not in allow_transfer_encoding_codes: 265 self.assertEqual(None, res.getheader('Transfer-Encoding')) 266 267 data = res.read() 268 self.assertEqual(b'', data) 269 270 def test_head_via_send_error(self): 271 allow_transfer_encoding_codes = (HTTPStatus.NOT_MODIFIED, 272 HTTPStatus.RESET_CONTENT) 273 for code in (HTTPStatus.OK, HTTPStatus.NO_CONTENT, 274 HTTPStatus.NOT_MODIFIED, HTTPStatus.RESET_CONTENT, 275 HTTPStatus.SWITCHING_PROTOCOLS): 276 self.con.request('HEAD', '/{}'.format(code)) 277 res = self.con.getresponse() 278 self.assertEqual(code, res.status) 279 if code == HTTPStatus.OK: 280 self.assertTrue(int(res.getheader('Content-Length')) > 0) 281 self.assertIn('text/html', res.getheader('Content-Type')) 282 else: 283 self.assertEqual(None, res.getheader('Content-Length')) 284 self.assertEqual(None, res.getheader('Content-Type')) 285 if code not in allow_transfer_encoding_codes: 286 self.assertEqual(None, res.getheader('Transfer-Encoding')) 287 288 data = res.read() 289 self.assertEqual(b'', data) 290 291 292class RequestHandlerLoggingTestCase(BaseTestCase): 293 class request_handler(BaseHTTPRequestHandler): 294 protocol_version = 'HTTP/1.1' 295 default_request_version = 'HTTP/1.1' 296 297 def do_GET(self): 298 self.send_response(HTTPStatus.OK) 299 self.end_headers() 300 301 def do_ERROR(self): 302 self.send_error(HTTPStatus.NOT_FOUND, 'File not found') 303 304 def test_get(self): 305 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 306 self.con.connect() 307 308 with support.captured_stderr() as err: 309 self.con.request('GET', '/') 310 self.con.getresponse() 311 312 self.assertTrue( 313 err.getvalue().endswith('"GET / HTTP/1.1" 200 -\n')) 314 315 def test_err(self): 316 self.con = http.client.HTTPConnection(self.HOST, self.PORT) 317 self.con.connect() 318 319 with support.captured_stderr() as err: 320 self.con.request('ERROR', '/') 321 self.con.getresponse() 322 323 lines = err.getvalue().split('\n') 324 self.assertTrue(lines[0].endswith('code 404, message File not found')) 325 self.assertTrue(lines[1].endswith('"ERROR / HTTP/1.1" 404 -')) 326 327 328class SimpleHTTPServerTestCase(BaseTestCase): 329 class request_handler(NoLogRequestHandler, SimpleHTTPRequestHandler): 330 pass 331 332 def setUp(self): 333 super().setUp() 334 self.cwd = os.getcwd() 335 basetempdir = tempfile.gettempdir() 336 os.chdir(basetempdir) 337 self.data = b'We are the knights who say Ni!' 338 self.tempdir = tempfile.mkdtemp(dir=basetempdir) 339 self.tempdir_name = os.path.basename(self.tempdir) 340 self.base_url = '/' + self.tempdir_name 341 tempname = os.path.join(self.tempdir, 'test') 342 with open(tempname, 'wb') as temp: 343 temp.write(self.data) 344 temp.flush() 345 mtime = os.stat(tempname).st_mtime 346 # compute last modification datetime for browser cache tests 347 last_modif = datetime.datetime.fromtimestamp(mtime, 348 datetime.timezone.utc) 349 self.last_modif_datetime = last_modif.replace(microsecond=0) 350 self.last_modif_header = email.utils.formatdate( 351 last_modif.timestamp(), usegmt=True) 352 353 def tearDown(self): 354 try: 355 os.chdir(self.cwd) 356 try: 357 shutil.rmtree(self.tempdir) 358 except: 359 pass 360 finally: 361 super().tearDown() 362 363 def check_status_and_reason(self, response, status, data=None): 364 def close_conn(): 365 """Don't close reader yet so we can check if there was leftover 366 buffered input""" 367 nonlocal reader 368 reader = response.fp 369 response.fp = None 370 reader = None 371 response._close_conn = close_conn 372 373 body = response.read() 374 self.assertTrue(response) 375 self.assertEqual(response.status, status) 376 self.assertIsNotNone(response.reason) 377 if data: 378 self.assertEqual(data, body) 379 # Ensure the server has not set up a persistent connection, and has 380 # not sent any extra data 381 self.assertEqual(response.version, 10) 382 self.assertEqual(response.msg.get("Connection", "close"), "close") 383 self.assertEqual(reader.read(30), b'', 'Connection should be closed') 384 385 reader.close() 386 return body 387 388 @unittest.skipIf(sys.platform == 'darwin', 389 'undecodable name cannot always be decoded on macOS') 390 @unittest.skipIf(sys.platform == 'win32', 391 'undecodable name cannot be decoded on win32') 392 @unittest.skipUnless(support.TESTFN_UNDECODABLE, 393 'need support.TESTFN_UNDECODABLE') 394 def test_undecodable_filename(self): 395 enc = sys.getfilesystemencoding() 396 filename = os.fsdecode(support.TESTFN_UNDECODABLE) + '.txt' 397 with open(os.path.join(self.tempdir, filename), 'wb') as f: 398 f.write(support.TESTFN_UNDECODABLE) 399 response = self.request(self.base_url + '/') 400 if sys.platform == 'darwin': 401 # On Mac OS the HFS+ filesystem replaces bytes that aren't valid 402 # UTF-8 into a percent-encoded value. 403 for name in os.listdir(self.tempdir): 404 if name != 'test': # Ignore a filename created in setUp(). 405 filename = name 406 break 407 body = self.check_status_and_reason(response, HTTPStatus.OK) 408 quotedname = urllib.parse.quote(filename, errors='surrogatepass') 409 self.assertIn(('href="%s"' % quotedname) 410 .encode(enc, 'surrogateescape'), body) 411 self.assertIn(('>%s<' % html.escape(filename, quote=False)) 412 .encode(enc, 'surrogateescape'), body) 413 response = self.request(self.base_url + '/' + quotedname) 414 self.check_status_and_reason(response, HTTPStatus.OK, 415 data=support.TESTFN_UNDECODABLE) 416 417 def test_get_dir_redirect_location_domain_injection_bug(self): 418 """Ensure //evil.co/..%2f../../X does not put //evil.co/ in Location. 419 420 //netloc/ in a Location header is a redirect to a new host. 421 https://github.com/python/cpython/issues/87389 422 423 This checks that a path resolving to a directory on our server cannot 424 resolve into a redirect to another server. 425 """ 426 os.mkdir(os.path.join(self.tempdir, 'existing_directory')) 427 url = f'/python.org/..%2f..%2f..%2f..%2f..%2f../%0a%0d/../{self.tempdir_name}/existing_directory' 428 expected_location = f'{url}/' # /python.org.../ single slash single prefix, trailing slash 429 # Canonicalizes to /tmp/tempdir_name/existing_directory which does 430 # exist and is a dir, triggering the 301 redirect logic. 431 response = self.request(url) 432 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 433 location = response.getheader('Location') 434 self.assertEqual(location, expected_location, msg='non-attack failed!') 435 436 # //python.org... multi-slash prefix, no trailing slash 437 attack_url = f'/{url}' 438 response = self.request(attack_url) 439 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 440 location = response.getheader('Location') 441 self.assertFalse(location.startswith('//'), msg=location) 442 self.assertEqual(location, expected_location, 443 msg='Expected Location header to start with a single / and ' 444 'end with a / as this is a directory redirect.') 445 446 # ///python.org... triple-slash prefix, no trailing slash 447 attack3_url = f'//{url}' 448 response = self.request(attack3_url) 449 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 450 self.assertEqual(response.getheader('Location'), expected_location) 451 452 # If the second word in the http request (Request-URI for the http 453 # method) is a full URI, we don't worry about it, as that'll be parsed 454 # and reassembled as a full URI within BaseHTTPRequestHandler.send_head 455 # so no errant scheme-less //netloc//evil.co/ domain mixup can happen. 456 attack_scheme_netloc_2slash_url = f'https://pypi.org/{url}' 457 expected_scheme_netloc_location = f'{attack_scheme_netloc_2slash_url}/' 458 response = self.request(attack_scheme_netloc_2slash_url) 459 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 460 location = response.getheader('Location') 461 # We're just ensuring that the scheme and domain make it through, if 462 # there are or aren't multiple slashes at the start of the path that 463 # follows that isn't important in this Location: header. 464 self.assertTrue(location.startswith('https://pypi.org/'), msg=location) 465 466 def test_get(self): 467 #constructs the path relative to the root directory of the HTTPServer 468 response = self.request(self.base_url + '/test') 469 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) 470 # check for trailing "/" which should return 404. See Issue17324 471 response = self.request(self.base_url + '/test/') 472 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 473 response = self.request(self.base_url + '/') 474 self.check_status_and_reason(response, HTTPStatus.OK) 475 response = self.request(self.base_url) 476 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 477 response = self.request(self.base_url + '/?hi=2') 478 self.check_status_and_reason(response, HTTPStatus.OK) 479 response = self.request(self.base_url + '?hi=1') 480 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 481 self.assertEqual(response.getheader("Location"), 482 self.base_url + "/?hi=1") 483 response = self.request('/ThisDoesNotExist') 484 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 485 response = self.request('/' + 'ThisDoesNotExist' + '/') 486 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 487 488 data = b"Dummy index file\r\n" 489 with open(os.path.join(self.tempdir_name, 'index.html'), 'wb') as f: 490 f.write(data) 491 response = self.request(self.base_url + '/') 492 self.check_status_and_reason(response, HTTPStatus.OK, data) 493 494 # chmod() doesn't work as expected on Windows, and filesystem 495 # permissions are ignored by root on Unix. 496 if os.name == 'posix' and os.geteuid() != 0: 497 os.chmod(self.tempdir, 0) 498 try: 499 response = self.request(self.base_url + '/') 500 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 501 finally: 502 os.chmod(self.tempdir, 0o755) 503 504 def test_head(self): 505 response = self.request( 506 self.base_url + '/test', method='HEAD') 507 self.check_status_and_reason(response, HTTPStatus.OK) 508 self.assertEqual(response.getheader('content-length'), 509 str(len(self.data))) 510 self.assertEqual(response.getheader('content-type'), 511 'application/octet-stream') 512 513 def test_browser_cache(self): 514 """Check that when a request to /test is sent with the request header 515 If-Modified-Since set to date of last modification, the server returns 516 status code 304, not 200 517 """ 518 headers = email.message.Message() 519 headers['If-Modified-Since'] = self.last_modif_header 520 response = self.request(self.base_url + '/test', headers=headers) 521 self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED) 522 523 # one hour after last modification : must return 304 524 new_dt = self.last_modif_datetime + datetime.timedelta(hours=1) 525 headers = email.message.Message() 526 headers['If-Modified-Since'] = email.utils.format_datetime(new_dt, 527 usegmt=True) 528 response = self.request(self.base_url + '/test', headers=headers) 529 self.check_status_and_reason(response, HTTPStatus.NOT_MODIFIED) 530 531 def test_browser_cache_file_changed(self): 532 # with If-Modified-Since earlier than Last-Modified, must return 200 533 dt = self.last_modif_datetime 534 # build datetime object : 365 days before last modification 535 old_dt = dt - datetime.timedelta(days=365) 536 headers = email.message.Message() 537 headers['If-Modified-Since'] = email.utils.format_datetime(old_dt, 538 usegmt=True) 539 response = self.request(self.base_url + '/test', headers=headers) 540 self.check_status_and_reason(response, HTTPStatus.OK) 541 542 def test_browser_cache_with_If_None_Match_header(self): 543 # if If-None-Match header is present, ignore If-Modified-Since 544 545 headers = email.message.Message() 546 headers['If-Modified-Since'] = self.last_modif_header 547 headers['If-None-Match'] = "*" 548 response = self.request(self.base_url + '/test', headers=headers) 549 self.check_status_and_reason(response, HTTPStatus.OK) 550 551 def test_invalid_requests(self): 552 response = self.request('/', method='FOO') 553 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 554 # requests must be case sensitive,so this should fail too 555 response = self.request('/', method='custom') 556 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 557 response = self.request('/', method='GETs') 558 self.check_status_and_reason(response, HTTPStatus.NOT_IMPLEMENTED) 559 560 def test_last_modified(self): 561 """Checks that the datetime returned in Last-Modified response header 562 is the actual datetime of last modification, rounded to the second 563 """ 564 response = self.request(self.base_url + '/test') 565 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) 566 last_modif_header = response.headers['Last-modified'] 567 self.assertEqual(last_modif_header, self.last_modif_header) 568 569 def test_path_without_leading_slash(self): 570 response = self.request(self.tempdir_name + '/test') 571 self.check_status_and_reason(response, HTTPStatus.OK, data=self.data) 572 response = self.request(self.tempdir_name + '/test/') 573 self.check_status_and_reason(response, HTTPStatus.NOT_FOUND) 574 response = self.request(self.tempdir_name + '/') 575 self.check_status_and_reason(response, HTTPStatus.OK) 576 response = self.request(self.tempdir_name) 577 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 578 response = self.request(self.tempdir_name + '/?hi=2') 579 self.check_status_and_reason(response, HTTPStatus.OK) 580 response = self.request(self.tempdir_name + '?hi=1') 581 self.check_status_and_reason(response, HTTPStatus.MOVED_PERMANENTLY) 582 self.assertEqual(response.getheader("Location"), 583 self.tempdir_name + "/?hi=1") 584 585 def test_html_escape_filename(self): 586 filename = '<test&>.txt' 587 fullpath = os.path.join(self.tempdir, filename) 588 589 try: 590 open(fullpath, 'w').close() 591 except OSError: 592 raise unittest.SkipTest('Can not create file %s on current file ' 593 'system' % filename) 594 595 try: 596 response = self.request(self.base_url + '/') 597 body = self.check_status_and_reason(response, HTTPStatus.OK) 598 enc = response.headers.get_content_charset() 599 finally: 600 os.unlink(fullpath) # avoid affecting test_undecodable_filename 601 602 self.assertIsNotNone(enc) 603 html_text = '>%s<' % html.escape(filename, quote=False) 604 self.assertIn(html_text.encode(enc), body) 605 606 607cgi_file1 = """\ 608#!%s 609 610print("Content-type: text/html") 611print() 612print("Hello World") 613""" 614 615cgi_file2 = """\ 616#!%s 617import cgi 618 619print("Content-type: text/html") 620print() 621 622form = cgi.FieldStorage() 623print("%%s, %%s, %%s" %% (form.getfirst("spam"), form.getfirst("eggs"), 624 form.getfirst("bacon"))) 625""" 626 627cgi_file4 = """\ 628#!%s 629import os 630 631print("Content-type: text/html") 632print() 633 634print(os.environ["%s"]) 635""" 636 637 638@unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0, 639 "This test can't be run reliably as root (issue #13308).") 640class CGIHTTPServerTestCase(BaseTestCase): 641 class request_handler(NoLogRequestHandler, CGIHTTPRequestHandler): 642 pass 643 644 linesep = os.linesep.encode('ascii') 645 646 def setUp(self): 647 BaseTestCase.setUp(self) 648 self.cwd = os.getcwd() 649 self.parent_dir = tempfile.mkdtemp() 650 self.cgi_dir = os.path.join(self.parent_dir, 'cgi-bin') 651 self.cgi_child_dir = os.path.join(self.cgi_dir, 'child-dir') 652 os.mkdir(self.cgi_dir) 653 os.mkdir(self.cgi_child_dir) 654 self.nocgi_path = None 655 self.file1_path = None 656 self.file2_path = None 657 self.file3_path = None 658 self.file4_path = None 659 660 # The shebang line should be pure ASCII: use symlink if possible. 661 # See issue #7668. 662 self._pythonexe_symlink = None 663 if support.can_symlink(): 664 self.pythonexe = os.path.join(self.parent_dir, 'python') 665 self._pythonexe_symlink = support.PythonSymlink(self.pythonexe).__enter__() 666 else: 667 self.pythonexe = sys.executable 668 669 try: 670 # The python executable path is written as the first line of the 671 # CGI Python script. The encoding cookie cannot be used, and so the 672 # path should be encodable to the default script encoding (utf-8) 673 self.pythonexe.encode('utf-8') 674 except UnicodeEncodeError: 675 self.tearDown() 676 self.skipTest("Python executable path is not encodable to utf-8") 677 678 self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py') 679 with open(self.nocgi_path, 'w') as fp: 680 fp.write(cgi_file1 % self.pythonexe) 681 os.chmod(self.nocgi_path, 0o777) 682 683 self.file1_path = os.path.join(self.cgi_dir, 'file1.py') 684 with open(self.file1_path, 'w', encoding='utf-8') as file1: 685 file1.write(cgi_file1 % self.pythonexe) 686 os.chmod(self.file1_path, 0o777) 687 688 self.file2_path = os.path.join(self.cgi_dir, 'file2.py') 689 with open(self.file2_path, 'w', encoding='utf-8') as file2: 690 file2.write(cgi_file2 % self.pythonexe) 691 os.chmod(self.file2_path, 0o777) 692 693 self.file3_path = os.path.join(self.cgi_child_dir, 'file3.py') 694 with open(self.file3_path, 'w', encoding='utf-8') as file3: 695 file3.write(cgi_file1 % self.pythonexe) 696 os.chmod(self.file3_path, 0o777) 697 698 self.file4_path = os.path.join(self.cgi_dir, 'file4.py') 699 with open(self.file4_path, 'w', encoding='utf-8') as file4: 700 file4.write(cgi_file4 % (self.pythonexe, 'QUERY_STRING')) 701 os.chmod(self.file4_path, 0o777) 702 703 os.chdir(self.parent_dir) 704 705 def tearDown(self): 706 try: 707 os.chdir(self.cwd) 708 if self._pythonexe_symlink: 709 self._pythonexe_symlink.__exit__(None, None, None) 710 if self.nocgi_path: 711 os.remove(self.nocgi_path) 712 if self.file1_path: 713 os.remove(self.file1_path) 714 if self.file2_path: 715 os.remove(self.file2_path) 716 if self.file3_path: 717 os.remove(self.file3_path) 718 if self.file4_path: 719 os.remove(self.file4_path) 720 os.rmdir(self.cgi_child_dir) 721 os.rmdir(self.cgi_dir) 722 os.rmdir(self.parent_dir) 723 finally: 724 BaseTestCase.tearDown(self) 725 726 def test_url_collapse_path(self): 727 # verify tail is the last portion and head is the rest on proper urls 728 test_vectors = { 729 '': '//', 730 '..': IndexError, 731 '/.//..': IndexError, 732 '/': '//', 733 '//': '//', 734 '/\\': '//\\', 735 '/.//': '//', 736 'cgi-bin/file1.py': '/cgi-bin/file1.py', 737 '/cgi-bin/file1.py': '/cgi-bin/file1.py', 738 'a': '//a', 739 '/a': '//a', 740 '//a': '//a', 741 './a': '//a', 742 './C:/': '/C:/', 743 '/a/b': '/a/b', 744 '/a/b/': '/a/b/', 745 '/a/b/.': '/a/b/', 746 '/a/b/c/..': '/a/b/', 747 '/a/b/c/../d': '/a/b/d', 748 '/a/b/c/../d/e/../f': '/a/b/d/f', 749 '/a/b/c/../d/e/../../f': '/a/b/f', 750 '/a/b/c/../d/e/.././././..//f': '/a/b/f', 751 '../a/b/c/../d/e/.././././..//f': IndexError, 752 '/a/b/c/../d/e/../../../f': '/a/f', 753 '/a/b/c/../d/e/../../../../f': '//f', 754 '/a/b/c/../d/e/../../../../../f': IndexError, 755 '/a/b/c/../d/e/../../../../f/..': '//', 756 '/a/b/c/../d/e/../../../../f/../.': '//', 757 } 758 for path, expected in test_vectors.items(): 759 if isinstance(expected, type) and issubclass(expected, Exception): 760 self.assertRaises(expected, 761 server._url_collapse_path, path) 762 else: 763 actual = server._url_collapse_path(path) 764 self.assertEqual(expected, actual, 765 msg='path = %r\nGot: %r\nWanted: %r' % 766 (path, actual, expected)) 767 768 def test_headers_and_content(self): 769 res = self.request('/cgi-bin/file1.py') 770 self.assertEqual( 771 (res.read(), res.getheader('Content-type'), res.status), 772 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK)) 773 774 def test_issue19435(self): 775 res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh') 776 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 777 778 def test_post(self): 779 params = urllib.parse.urlencode( 780 {'spam' : 1, 'eggs' : 'python', 'bacon' : 123456}) 781 headers = {'Content-type' : 'application/x-www-form-urlencoded'} 782 res = self.request('/cgi-bin/file2.py', 'POST', params, headers) 783 784 self.assertEqual(res.read(), b'1, python, 123456' + self.linesep) 785 786 def test_invaliduri(self): 787 res = self.request('/cgi-bin/invalid') 788 res.read() 789 self.assertEqual(res.status, HTTPStatus.NOT_FOUND) 790 791 def test_authorization(self): 792 headers = {b'Authorization' : b'Basic ' + 793 base64.b64encode(b'username:pass')} 794 res = self.request('/cgi-bin/file1.py', 'GET', headers=headers) 795 self.assertEqual( 796 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 797 (res.read(), res.getheader('Content-type'), res.status)) 798 799 def test_no_leading_slash(self): 800 # http://bugs.python.org/issue2254 801 res = self.request('cgi-bin/file1.py') 802 self.assertEqual( 803 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 804 (res.read(), res.getheader('Content-type'), res.status)) 805 806 def test_os_environ_is_not_altered(self): 807 signature = "Test CGI Server" 808 os.environ['SERVER_SOFTWARE'] = signature 809 res = self.request('/cgi-bin/file1.py') 810 self.assertEqual( 811 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 812 (res.read(), res.getheader('Content-type'), res.status)) 813 self.assertEqual(os.environ['SERVER_SOFTWARE'], signature) 814 815 def test_urlquote_decoding_in_cgi_check(self): 816 res = self.request('/cgi-bin%2ffile1.py') 817 self.assertEqual( 818 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 819 (res.read(), res.getheader('Content-type'), res.status)) 820 821 def test_nested_cgi_path_issue21323(self): 822 res = self.request('/cgi-bin/child-dir/file3.py') 823 self.assertEqual( 824 (b'Hello World' + self.linesep, 'text/html', HTTPStatus.OK), 825 (res.read(), res.getheader('Content-type'), res.status)) 826 827 def test_query_with_multiple_question_mark(self): 828 res = self.request('/cgi-bin/file4.py?a=b?c=d') 829 self.assertEqual( 830 (b'a=b?c=d' + self.linesep, 'text/html', HTTPStatus.OK), 831 (res.read(), res.getheader('Content-type'), res.status)) 832 833 def test_query_with_continuous_slashes(self): 834 res = self.request('/cgi-bin/file4.py?k=aa%2F%2Fbb&//q//p//=//a//b//') 835 self.assertEqual( 836 (b'k=aa%2F%2Fbb&//q//p//=//a//b//' + self.linesep, 837 'text/html', HTTPStatus.OK), 838 (res.read(), res.getheader('Content-type'), res.status)) 839 840 841class SocketlessRequestHandler(SimpleHTTPRequestHandler): 842 def __init__(self, *args, **kwargs): 843 request = mock.Mock() 844 request.makefile.return_value = BytesIO() 845 super().__init__(request, None, None) 846 847 self.get_called = False 848 self.protocol_version = "HTTP/1.1" 849 850 def do_GET(self): 851 self.get_called = True 852 self.send_response(HTTPStatus.OK) 853 self.send_header('Content-Type', 'text/html') 854 self.end_headers() 855 self.wfile.write(b'<html><body>Data</body></html>\r\n') 856 857 def log_message(self, format, *args): 858 pass 859 860class RejectingSocketlessRequestHandler(SocketlessRequestHandler): 861 def handle_expect_100(self): 862 self.send_error(HTTPStatus.EXPECTATION_FAILED) 863 return False 864 865 866class AuditableBytesIO: 867 868 def __init__(self): 869 self.datas = [] 870 871 def write(self, data): 872 self.datas.append(data) 873 874 def getData(self): 875 return b''.join(self.datas) 876 877 @property 878 def numWrites(self): 879 return len(self.datas) 880 881 882class BaseHTTPRequestHandlerTestCase(unittest.TestCase): 883 """Test the functionality of the BaseHTTPServer. 884 885 Test the support for the Expect 100-continue header. 886 """ 887 888 HTTPResponseMatch = re.compile(b'HTTP/1.[0-9]+ 200 OK') 889 890 def setUp (self): 891 self.handler = SocketlessRequestHandler() 892 893 def send_typical_request(self, message): 894 input = BytesIO(message) 895 output = BytesIO() 896 self.handler.rfile = input 897 self.handler.wfile = output 898 self.handler.handle_one_request() 899 output.seek(0) 900 return output.readlines() 901 902 def verify_get_called(self): 903 self.assertTrue(self.handler.get_called) 904 905 def verify_expected_headers(self, headers): 906 for fieldName in b'Server: ', b'Date: ', b'Content-Type: ': 907 self.assertEqual(sum(h.startswith(fieldName) for h in headers), 1) 908 909 def verify_http_server_response(self, response): 910 match = self.HTTPResponseMatch.search(response) 911 self.assertIsNotNone(match) 912 913 def test_http_1_1(self): 914 result = self.send_typical_request(b'GET / HTTP/1.1\r\n\r\n') 915 self.verify_http_server_response(result[0]) 916 self.verify_expected_headers(result[1:-1]) 917 self.verify_get_called() 918 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 919 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 920 self.assertEqual(self.handler.command, 'GET') 921 self.assertEqual(self.handler.path, '/') 922 self.assertEqual(self.handler.request_version, 'HTTP/1.1') 923 self.assertSequenceEqual(self.handler.headers.items(), ()) 924 925 def test_http_1_0(self): 926 result = self.send_typical_request(b'GET / HTTP/1.0\r\n\r\n') 927 self.verify_http_server_response(result[0]) 928 self.verify_expected_headers(result[1:-1]) 929 self.verify_get_called() 930 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 931 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0') 932 self.assertEqual(self.handler.command, 'GET') 933 self.assertEqual(self.handler.path, '/') 934 self.assertEqual(self.handler.request_version, 'HTTP/1.0') 935 self.assertSequenceEqual(self.handler.headers.items(), ()) 936 937 def test_http_0_9(self): 938 result = self.send_typical_request(b'GET / HTTP/0.9\r\n\r\n') 939 self.assertEqual(len(result), 1) 940 self.assertEqual(result[0], b'<html><body>Data</body></html>\r\n') 941 self.verify_get_called() 942 943 def test_extra_space(self): 944 result = self.send_typical_request( 945 b'GET /spaced out HTTP/1.1\r\n' 946 b'Host: dummy\r\n' 947 b'\r\n' 948 ) 949 self.assertTrue(result[0].startswith(b'HTTP/1.1 400 ')) 950 self.verify_expected_headers(result[1:result.index(b'\r\n')]) 951 self.assertFalse(self.handler.get_called) 952 953 def test_with_continue_1_0(self): 954 result = self.send_typical_request(b'GET / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n') 955 self.verify_http_server_response(result[0]) 956 self.verify_expected_headers(result[1:-1]) 957 self.verify_get_called() 958 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 959 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.0') 960 self.assertEqual(self.handler.command, 'GET') 961 self.assertEqual(self.handler.path, '/') 962 self.assertEqual(self.handler.request_version, 'HTTP/1.0') 963 headers = (("Expect", "100-continue"),) 964 self.assertSequenceEqual(self.handler.headers.items(), headers) 965 966 def test_with_continue_1_1(self): 967 result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 968 self.assertEqual(result[0], b'HTTP/1.1 100 Continue\r\n') 969 self.assertEqual(result[1], b'\r\n') 970 self.assertEqual(result[2], b'HTTP/1.1 200 OK\r\n') 971 self.verify_expected_headers(result[2:-1]) 972 self.verify_get_called() 973 self.assertEqual(result[-1], b'<html><body>Data</body></html>\r\n') 974 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 975 self.assertEqual(self.handler.command, 'GET') 976 self.assertEqual(self.handler.path, '/') 977 self.assertEqual(self.handler.request_version, 'HTTP/1.1') 978 headers = (("Expect", "100-continue"),) 979 self.assertSequenceEqual(self.handler.headers.items(), headers) 980 981 def test_header_buffering_of_send_error(self): 982 983 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 984 output = AuditableBytesIO() 985 handler = SocketlessRequestHandler() 986 handler.rfile = input 987 handler.wfile = output 988 handler.request_version = 'HTTP/1.1' 989 handler.requestline = '' 990 handler.command = None 991 992 handler.send_error(418) 993 self.assertEqual(output.numWrites, 2) 994 995 def test_header_buffering_of_send_response_only(self): 996 997 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 998 output = AuditableBytesIO() 999 handler = SocketlessRequestHandler() 1000 handler.rfile = input 1001 handler.wfile = output 1002 handler.request_version = 'HTTP/1.1' 1003 1004 handler.send_response_only(418) 1005 self.assertEqual(output.numWrites, 0) 1006 handler.end_headers() 1007 self.assertEqual(output.numWrites, 1) 1008 1009 def test_header_buffering_of_send_header(self): 1010 1011 input = BytesIO(b'GET / HTTP/1.1\r\n\r\n') 1012 output = AuditableBytesIO() 1013 handler = SocketlessRequestHandler() 1014 handler.rfile = input 1015 handler.wfile = output 1016 handler.request_version = 'HTTP/1.1' 1017 1018 handler.send_header('Foo', 'foo') 1019 handler.send_header('bar', 'bar') 1020 self.assertEqual(output.numWrites, 0) 1021 handler.end_headers() 1022 self.assertEqual(output.getData(), b'Foo: foo\r\nbar: bar\r\n\r\n') 1023 self.assertEqual(output.numWrites, 1) 1024 1025 def test_header_unbuffered_when_continue(self): 1026 1027 def _readAndReseek(f): 1028 pos = f.tell() 1029 f.seek(0) 1030 data = f.read() 1031 f.seek(pos) 1032 return data 1033 1034 input = BytesIO(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 1035 output = BytesIO() 1036 self.handler.rfile = input 1037 self.handler.wfile = output 1038 self.handler.request_version = 'HTTP/1.1' 1039 1040 self.handler.handle_one_request() 1041 self.assertNotEqual(_readAndReseek(output), b'') 1042 result = _readAndReseek(output).split(b'\r\n') 1043 self.assertEqual(result[0], b'HTTP/1.1 100 Continue') 1044 self.assertEqual(result[1], b'') 1045 self.assertEqual(result[2], b'HTTP/1.1 200 OK') 1046 1047 def test_with_continue_rejected(self): 1048 usual_handler = self.handler # Save to avoid breaking any subsequent tests. 1049 self.handler = RejectingSocketlessRequestHandler() 1050 result = self.send_typical_request(b'GET / HTTP/1.1\r\nExpect: 100-continue\r\n\r\n') 1051 self.assertEqual(result[0], b'HTTP/1.1 417 Expectation Failed\r\n') 1052 self.verify_expected_headers(result[1:-1]) 1053 # The expect handler should short circuit the usual get method by 1054 # returning false here, so get_called should be false 1055 self.assertFalse(self.handler.get_called) 1056 self.assertEqual(sum(r == b'Connection: close\r\n' for r in result[1:-1]), 1) 1057 self.handler = usual_handler # Restore to avoid breaking any subsequent tests. 1058 1059 def test_request_length(self): 1060 # Issue #10714: huge request lines are discarded, to avoid Denial 1061 # of Service attacks. 1062 result = self.send_typical_request(b'GET ' + b'x' * 65537) 1063 self.assertEqual(result[0], b'HTTP/1.1 414 Request-URI Too Long\r\n') 1064 self.assertFalse(self.handler.get_called) 1065 self.assertIsInstance(self.handler.requestline, str) 1066 1067 def test_header_length(self): 1068 # Issue #6791: same for headers 1069 result = self.send_typical_request( 1070 b'GET / HTTP/1.1\r\nX-Foo: bar' + b'r' * 65537 + b'\r\n\r\n') 1071 self.assertEqual(result[0], b'HTTP/1.1 431 Line too long\r\n') 1072 self.assertFalse(self.handler.get_called) 1073 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 1074 1075 def test_too_many_headers(self): 1076 result = self.send_typical_request( 1077 b'GET / HTTP/1.1\r\n' + b'X-Foo: bar\r\n' * 101 + b'\r\n') 1078 self.assertEqual(result[0], b'HTTP/1.1 431 Too many headers\r\n') 1079 self.assertFalse(self.handler.get_called) 1080 self.assertEqual(self.handler.requestline, 'GET / HTTP/1.1') 1081 1082 def test_html_escape_on_error(self): 1083 result = self.send_typical_request( 1084 b'<script>alert("hello")</script> / HTTP/1.1') 1085 result = b''.join(result) 1086 text = '<script>alert("hello")</script>' 1087 self.assertIn(html.escape(text, quote=False).encode('ascii'), result) 1088 1089 def test_close_connection(self): 1090 # handle_one_request() should be repeatedly called until 1091 # it sets close_connection 1092 def handle_one_request(): 1093 self.handler.close_connection = next(close_values) 1094 self.handler.handle_one_request = handle_one_request 1095 1096 close_values = iter((True,)) 1097 self.handler.handle() 1098 self.assertRaises(StopIteration, next, close_values) 1099 1100 close_values = iter((False, False, True)) 1101 self.handler.handle() 1102 self.assertRaises(StopIteration, next, close_values) 1103 1104 def test_date_time_string(self): 1105 now = time.time() 1106 # this is the old code that formats the timestamp 1107 year, month, day, hh, mm, ss, wd, y, z = time.gmtime(now) 1108 expected = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % ( 1109 self.handler.weekdayname[wd], 1110 day, 1111 self.handler.monthname[month], 1112 year, hh, mm, ss 1113 ) 1114 self.assertEqual(self.handler.date_time_string(timestamp=now), expected) 1115 1116 1117class SimpleHTTPRequestHandlerTestCase(unittest.TestCase): 1118 """ Test url parsing """ 1119 def setUp(self): 1120 self.translated = os.getcwd() 1121 self.translated = os.path.join(self.translated, 'filename') 1122 self.handler = SocketlessRequestHandler() 1123 1124 def test_query_arguments(self): 1125 path = self.handler.translate_path('/filename') 1126 self.assertEqual(path, self.translated) 1127 path = self.handler.translate_path('/filename?foo=bar') 1128 self.assertEqual(path, self.translated) 1129 path = self.handler.translate_path('/filename?a=b&spam=eggs#zot') 1130 self.assertEqual(path, self.translated) 1131 1132 def test_start_with_double_slash(self): 1133 path = self.handler.translate_path('//filename') 1134 self.assertEqual(path, self.translated) 1135 path = self.handler.translate_path('//filename?foo=bar') 1136 self.assertEqual(path, self.translated) 1137 1138 def test_windows_colon(self): 1139 with support.swap_attr(server.os, 'path', ntpath): 1140 path = self.handler.translate_path('c:c:c:foo/filename') 1141 path = path.replace(ntpath.sep, os.sep) 1142 self.assertEqual(path, self.translated) 1143 1144 path = self.handler.translate_path('\\c:../filename') 1145 path = path.replace(ntpath.sep, os.sep) 1146 self.assertEqual(path, self.translated) 1147 1148 path = self.handler.translate_path('c:\\c:..\\foo/filename') 1149 path = path.replace(ntpath.sep, os.sep) 1150 self.assertEqual(path, self.translated) 1151 1152 path = self.handler.translate_path('c:c:foo\\c:c:bar/filename') 1153 path = path.replace(ntpath.sep, os.sep) 1154 self.assertEqual(path, self.translated) 1155 1156 1157class MiscTestCase(unittest.TestCase): 1158 def test_all(self): 1159 expected = [] 1160 blacklist = {'executable', 'nobody_uid', 'test'} 1161 for name in dir(server): 1162 if name.startswith('_') or name in blacklist: 1163 continue 1164 module_object = getattr(server, name) 1165 if getattr(module_object, '__module__', None) == 'http.server': 1166 expected.append(name) 1167 self.assertCountEqual(server.__all__, expected) 1168 1169 1170class ScriptTestCase(unittest.TestCase): 1171 1172 def mock_server_class(self): 1173 return mock.MagicMock( 1174 return_value=mock.MagicMock( 1175 __enter__=mock.MagicMock( 1176 return_value=mock.MagicMock( 1177 socket=mock.MagicMock( 1178 getsockname=lambda: ('', 0), 1179 ), 1180 ), 1181 ), 1182 ), 1183 ) 1184 1185 @mock.patch('builtins.print') 1186 def test_server_test_unspec(self, _): 1187 mock_server = self.mock_server_class() 1188 server.test(ServerClass=mock_server, bind=None) 1189 self.assertIn( 1190 mock_server.address_family, 1191 (socket.AF_INET6, socket.AF_INET), 1192 ) 1193 1194 @mock.patch('builtins.print') 1195 def test_server_test_localhost(self, _): 1196 mock_server = self.mock_server_class() 1197 server.test(ServerClass=mock_server, bind="localhost") 1198 self.assertIn( 1199 mock_server.address_family, 1200 (socket.AF_INET6, socket.AF_INET), 1201 ) 1202 1203 ipv6_addrs = ( 1204 "::", 1205 "2001:0db8:85a3:0000:0000:8a2e:0370:7334", 1206 "::1", 1207 ) 1208 1209 ipv4_addrs = ( 1210 "0.0.0.0", 1211 "8.8.8.8", 1212 "127.0.0.1", 1213 ) 1214 1215 @mock.patch('builtins.print') 1216 def test_server_test_ipv6(self, _): 1217 for bind in self.ipv6_addrs: 1218 mock_server = self.mock_server_class() 1219 server.test(ServerClass=mock_server, bind=bind) 1220 self.assertEqual(mock_server.address_family, socket.AF_INET6) 1221 1222 @mock.patch('builtins.print') 1223 def test_server_test_ipv4(self, _): 1224 for bind in self.ipv4_addrs: 1225 mock_server = self.mock_server_class() 1226 server.test(ServerClass=mock_server, bind=bind) 1227 self.assertEqual(mock_server.address_family, socket.AF_INET) 1228 1229 1230def test_main(verbose=None): 1231 cwd = os.getcwd() 1232 try: 1233 support.run_unittest( 1234 RequestHandlerLoggingTestCase, 1235 BaseHTTPRequestHandlerTestCase, 1236 BaseHTTPServerTestCase, 1237 SimpleHTTPServerTestCase, 1238 CGIHTTPServerTestCase, 1239 SimpleHTTPRequestHandlerTestCase, 1240 MiscTestCase, 1241 ScriptTestCase 1242 ) 1243 finally: 1244 os.chdir(cwd) 1245 1246if __name__ == '__main__': 1247 test_main() 1248