1import base64 2import os 3import email 4import urllib.parse 5import urllib.request 6import http.server 7import threading 8import unittest 9import hashlib 10 11from test import support 12from test.support import hashlib_helper 13 14try: 15 import ssl 16except ImportError: 17 ssl = None 18 19here = os.path.dirname(__file__) 20# Self-signed cert file for 'localhost' 21CERT_localhost = os.path.join(here, 'keycert.pem') 22# Self-signed cert file for 'fakehostname' 23CERT_fakehostname = os.path.join(here, 'keycert2.pem') 24 25 26# Loopback http server infrastructure 27 28class LoopbackHttpServer(http.server.HTTPServer): 29 """HTTP server w/ a few modifications that make it useful for 30 loopback testing purposes. 31 """ 32 33 def __init__(self, server_address, RequestHandlerClass): 34 http.server.HTTPServer.__init__(self, 35 server_address, 36 RequestHandlerClass) 37 38 # Set the timeout of our listening socket really low so 39 # that we can stop the server easily. 40 self.socket.settimeout(0.1) 41 42 def get_request(self): 43 """HTTPServer method, overridden.""" 44 45 request, client_address = self.socket.accept() 46 47 # It's a loopback connection, so setting the timeout 48 # really low shouldn't affect anything, but should make 49 # deadlocks less likely to occur. 50 request.settimeout(10.0) 51 52 return (request, client_address) 53 54class LoopbackHttpServerThread(threading.Thread): 55 """Stoppable thread that runs a loopback http server.""" 56 57 def __init__(self, request_handler): 58 threading.Thread.__init__(self) 59 self._stop_server = False 60 self.ready = threading.Event() 61 request_handler.protocol_version = "HTTP/1.0" 62 self.httpd = LoopbackHttpServer(("127.0.0.1", 0), 63 request_handler) 64 self.port = self.httpd.server_port 65 66 def stop(self): 67 """Stops the webserver if it's currently running.""" 68 69 self._stop_server = True 70 71 self.join() 72 self.httpd.server_close() 73 74 def run(self): 75 self.ready.set() 76 while not self._stop_server: 77 self.httpd.handle_request() 78 79# Authentication infrastructure 80 81class DigestAuthHandler: 82 """Handler for performing digest authentication.""" 83 84 def __init__(self): 85 self._request_num = 0 86 self._nonces = [] 87 self._users = {} 88 self._realm_name = "Test Realm" 89 self._qop = "auth" 90 91 def set_qop(self, qop): 92 self._qop = qop 93 94 def set_users(self, users): 95 assert isinstance(users, dict) 96 self._users = users 97 98 def set_realm(self, realm): 99 self._realm_name = realm 100 101 def _generate_nonce(self): 102 self._request_num += 1 103 nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest() 104 self._nonces.append(nonce) 105 return nonce 106 107 def _create_auth_dict(self, auth_str): 108 first_space_index = auth_str.find(" ") 109 auth_str = auth_str[first_space_index+1:] 110 111 parts = auth_str.split(",") 112 113 auth_dict = {} 114 for part in parts: 115 name, value = part.split("=") 116 name = name.strip() 117 if value[0] == '"' and value[-1] == '"': 118 value = value[1:-1] 119 else: 120 value = value.strip() 121 auth_dict[name] = value 122 return auth_dict 123 124 def _validate_auth(self, auth_dict, password, method, uri): 125 final_dict = {} 126 final_dict.update(auth_dict) 127 final_dict["password"] = password 128 final_dict["method"] = method 129 final_dict["uri"] = uri 130 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict 131 HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest() 132 HA2_str = "%(method)s:%(uri)s" % final_dict 133 HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest() 134 final_dict["HA1"] = HA1 135 final_dict["HA2"] = HA2 136 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \ 137 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict 138 response = hashlib.md5(response_str.encode("ascii")).hexdigest() 139 140 return response == auth_dict["response"] 141 142 def _return_auth_challenge(self, request_handler): 143 request_handler.send_response(407, "Proxy Authentication Required") 144 request_handler.send_header("Content-Type", "text/html") 145 request_handler.send_header( 146 'Proxy-Authenticate', 'Digest realm="%s", ' 147 'qop="%s",' 148 'nonce="%s", ' % \ 149 (self._realm_name, self._qop, self._generate_nonce())) 150 # XXX: Not sure if we're supposed to add this next header or 151 # not. 152 #request_handler.send_header('Connection', 'close') 153 request_handler.end_headers() 154 request_handler.wfile.write(b"Proxy Authentication Required.") 155 return False 156 157 def handle_request(self, request_handler): 158 """Performs digest authentication on the given HTTP request 159 handler. Returns True if authentication was successful, False 160 otherwise. 161 162 If no users have been set, then digest auth is effectively 163 disabled and this method will always return True. 164 """ 165 166 if len(self._users) == 0: 167 return True 168 169 if "Proxy-Authorization" not in request_handler.headers: 170 return self._return_auth_challenge(request_handler) 171 else: 172 auth_dict = self._create_auth_dict( 173 request_handler.headers["Proxy-Authorization"] 174 ) 175 if auth_dict["username"] in self._users: 176 password = self._users[ auth_dict["username"] ] 177 else: 178 return self._return_auth_challenge(request_handler) 179 if not auth_dict.get("nonce") in self._nonces: 180 return self._return_auth_challenge(request_handler) 181 else: 182 self._nonces.remove(auth_dict["nonce"]) 183 184 auth_validated = False 185 186 # MSIE uses short_path in its validation, but Python's 187 # urllib.request uses the full path, so we're going to see if 188 # either of them works here. 189 190 for path in [request_handler.path, request_handler.short_path]: 191 if self._validate_auth(auth_dict, 192 password, 193 request_handler.command, 194 path): 195 auth_validated = True 196 197 if not auth_validated: 198 return self._return_auth_challenge(request_handler) 199 return True 200 201 202class BasicAuthHandler(http.server.BaseHTTPRequestHandler): 203 """Handler for performing basic authentication.""" 204 # Server side values 205 USER = 'testUser' 206 PASSWD = 'testPass' 207 REALM = 'Test' 208 USER_PASSWD = "%s:%s" % (USER, PASSWD) 209 ENCODED_AUTH = base64.b64encode(USER_PASSWD.encode('ascii')).decode('ascii') 210 211 def __init__(self, *args, **kwargs): 212 http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) 213 214 def log_message(self, format, *args): 215 # Suppress console log message 216 pass 217 218 def do_HEAD(self): 219 self.send_response(200) 220 self.send_header("Content-type", "text/html") 221 self.end_headers() 222 223 def do_AUTHHEAD(self): 224 self.send_response(401) 225 self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM) 226 self.send_header("Content-type", "text/html") 227 self.end_headers() 228 229 def do_GET(self): 230 if not self.headers.get("Authorization", ""): 231 self.do_AUTHHEAD() 232 self.wfile.write(b"No Auth header received") 233 elif self.headers.get( 234 "Authorization", "") == "Basic " + self.ENCODED_AUTH: 235 self.send_response(200) 236 self.end_headers() 237 self.wfile.write(b"It works") 238 else: 239 # Request Unauthorized 240 self.do_AUTHHEAD() 241 242 243 244# Proxy test infrastructure 245 246class FakeProxyHandler(http.server.BaseHTTPRequestHandler): 247 """This is a 'fake proxy' that makes it look like the entire 248 internet has gone down due to a sudden zombie invasion. It main 249 utility is in providing us with authentication support for 250 testing. 251 """ 252 253 def __init__(self, digest_auth_handler, *args, **kwargs): 254 # This has to be set before calling our parent's __init__(), which will 255 # try to call do_GET(). 256 self.digest_auth_handler = digest_auth_handler 257 http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) 258 259 def log_message(self, format, *args): 260 # Uncomment the next line for debugging. 261 # sys.stderr.write(format % args) 262 pass 263 264 def do_GET(self): 265 (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse( 266 self.path, "http") 267 self.short_path = path 268 if self.digest_auth_handler.handle_request(self): 269 self.send_response(200, "OK") 270 self.send_header("Content-Type", "text/html") 271 self.end_headers() 272 self.wfile.write(bytes("You've reached %s!<BR>" % self.path, 273 "ascii")) 274 self.wfile.write(b"Our apologies, but our server is down due to " 275 b"a sudden zombie invasion.") 276 277# Test cases 278 279class BasicAuthTests(unittest.TestCase): 280 USER = "testUser" 281 PASSWD = "testPass" 282 INCORRECT_PASSWD = "Incorrect" 283 REALM = "Test" 284 285 def setUp(self): 286 super(BasicAuthTests, self).setUp() 287 # With Basic Authentication 288 def http_server_with_basic_auth_handler(*args, **kwargs): 289 return BasicAuthHandler(*args, **kwargs) 290 self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler) 291 self.addCleanup(self.stop_server) 292 self.server_url = 'http://127.0.0.1:%s' % self.server.port 293 self.server.start() 294 self.server.ready.wait() 295 296 def stop_server(self): 297 self.server.stop() 298 self.server = None 299 300 def tearDown(self): 301 super(BasicAuthTests, self).tearDown() 302 303 def test_basic_auth_success(self): 304 ah = urllib.request.HTTPBasicAuthHandler() 305 ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD) 306 urllib.request.install_opener(urllib.request.build_opener(ah)) 307 try: 308 self.assertTrue(urllib.request.urlopen(self.server_url)) 309 except urllib.error.HTTPError: 310 self.fail("Basic auth failed for the url: %s" % self.server_url) 311 312 def test_basic_auth_httperror(self): 313 ah = urllib.request.HTTPBasicAuthHandler() 314 ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD) 315 urllib.request.install_opener(urllib.request.build_opener(ah)) 316 self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url) 317 318 319@hashlib_helper.requires_hashdigest("md5") 320class ProxyAuthTests(unittest.TestCase): 321 URL = "http://localhost" 322 323 USER = "tester" 324 PASSWD = "test123" 325 REALM = "TestRealm" 326 327 def setUp(self): 328 super(ProxyAuthTests, self).setUp() 329 # Ignore proxy bypass settings in the environment. 330 def restore_environ(old_environ): 331 os.environ.clear() 332 os.environ.update(old_environ) 333 self.addCleanup(restore_environ, os.environ.copy()) 334 os.environ['NO_PROXY'] = '' 335 os.environ['no_proxy'] = '' 336 337 self.digest_auth_handler = DigestAuthHandler() 338 self.digest_auth_handler.set_users({self.USER: self.PASSWD}) 339 self.digest_auth_handler.set_realm(self.REALM) 340 # With Digest Authentication. 341 def create_fake_proxy_handler(*args, **kwargs): 342 return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs) 343 344 self.server = LoopbackHttpServerThread(create_fake_proxy_handler) 345 self.addCleanup(self.stop_server) 346 self.server.start() 347 self.server.ready.wait() 348 proxy_url = "http://127.0.0.1:%d" % self.server.port 349 handler = urllib.request.ProxyHandler({"http" : proxy_url}) 350 self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler() 351 self.opener = urllib.request.build_opener( 352 handler, self.proxy_digest_handler) 353 354 def stop_server(self): 355 self.server.stop() 356 self.server = None 357 358 def test_proxy_with_bad_password_raises_httperror(self): 359 self.proxy_digest_handler.add_password(self.REALM, self.URL, 360 self.USER, self.PASSWD+"bad") 361 self.digest_auth_handler.set_qop("auth") 362 self.assertRaises(urllib.error.HTTPError, 363 self.opener.open, 364 self.URL) 365 366 def test_proxy_with_no_password_raises_httperror(self): 367 self.digest_auth_handler.set_qop("auth") 368 self.assertRaises(urllib.error.HTTPError, 369 self.opener.open, 370 self.URL) 371 372 def test_proxy_qop_auth_works(self): 373 self.proxy_digest_handler.add_password(self.REALM, self.URL, 374 self.USER, self.PASSWD) 375 self.digest_auth_handler.set_qop("auth") 376 with self.opener.open(self.URL) as result: 377 while result.read(): 378 pass 379 380 def test_proxy_qop_auth_int_works_or_throws_urlerror(self): 381 self.proxy_digest_handler.add_password(self.REALM, self.URL, 382 self.USER, self.PASSWD) 383 self.digest_auth_handler.set_qop("auth-int") 384 try: 385 result = self.opener.open(self.URL) 386 except urllib.error.URLError: 387 # It's okay if we don't support auth-int, but we certainly 388 # shouldn't receive any kind of exception here other than 389 # a URLError. 390 pass 391 else: 392 with result: 393 while result.read(): 394 pass 395 396 397def GetRequestHandler(responses): 398 399 class FakeHTTPRequestHandler(http.server.BaseHTTPRequestHandler): 400 401 server_version = "TestHTTP/" 402 requests = [] 403 headers_received = [] 404 port = 80 405 406 def do_GET(self): 407 body = self.send_head() 408 while body: 409 done = self.wfile.write(body) 410 body = body[done:] 411 412 def do_POST(self): 413 content_length = self.headers["Content-Length"] 414 post_data = self.rfile.read(int(content_length)) 415 self.do_GET() 416 self.requests.append(post_data) 417 418 def send_head(self): 419 FakeHTTPRequestHandler.headers_received = self.headers 420 self.requests.append(self.path) 421 response_code, headers, body = responses.pop(0) 422 423 self.send_response(response_code) 424 425 for (header, value) in headers: 426 self.send_header(header, value % {'port':self.port}) 427 if body: 428 self.send_header("Content-type", "text/plain") 429 self.end_headers() 430 return body 431 self.end_headers() 432 433 def log_message(self, *args): 434 pass 435 436 437 return FakeHTTPRequestHandler 438 439 440class TestUrlopen(unittest.TestCase): 441 """Tests urllib.request.urlopen using the network. 442 443 These tests are not exhaustive. Assuming that testing using files does a 444 good job overall of some of the basic interface features. There are no 445 tests exercising the optional 'data' and 'proxies' arguments. No tests 446 for transparent redirection have been written. 447 """ 448 449 def setUp(self): 450 super(TestUrlopen, self).setUp() 451 452 # clear _opener global variable 453 self.addCleanup(urllib.request.urlcleanup) 454 455 # Ignore proxies for localhost tests. 456 def restore_environ(old_environ): 457 os.environ.clear() 458 os.environ.update(old_environ) 459 self.addCleanup(restore_environ, os.environ.copy()) 460 os.environ['NO_PROXY'] = '*' 461 os.environ['no_proxy'] = '*' 462 463 def urlopen(self, url, data=None, **kwargs): 464 l = [] 465 f = urllib.request.urlopen(url, data, **kwargs) 466 try: 467 # Exercise various methods 468 l.extend(f.readlines(200)) 469 l.append(f.readline()) 470 l.append(f.read(1024)) 471 l.append(f.read()) 472 finally: 473 f.close() 474 return b"".join(l) 475 476 def stop_server(self): 477 self.server.stop() 478 self.server = None 479 480 def start_server(self, responses=None): 481 if responses is None: 482 responses = [(200, [], b"we don't care")] 483 handler = GetRequestHandler(responses) 484 485 self.server = LoopbackHttpServerThread(handler) 486 self.addCleanup(self.stop_server) 487 self.server.start() 488 self.server.ready.wait() 489 port = self.server.port 490 handler.port = port 491 return handler 492 493 def start_https_server(self, responses=None, **kwargs): 494 if not hasattr(urllib.request, 'HTTPSHandler'): 495 self.skipTest('ssl support required') 496 from test.ssl_servers import make_https_server 497 if responses is None: 498 responses = [(200, [], b"we care a bit")] 499 handler = GetRequestHandler(responses) 500 server = make_https_server(self, handler_class=handler, **kwargs) 501 handler.port = server.port 502 return handler 503 504 def test_redirection(self): 505 expected_response = b"We got here..." 506 responses = [ 507 (302, [("Location", "http://localhost:%(port)s/somewhere_else")], 508 ""), 509 (200, [], expected_response) 510 ] 511 512 handler = self.start_server(responses) 513 data = self.urlopen("http://localhost:%s/" % handler.port) 514 self.assertEqual(data, expected_response) 515 self.assertEqual(handler.requests, ["/", "/somewhere_else"]) 516 517 def test_chunked(self): 518 expected_response = b"hello world" 519 chunked_start = ( 520 b'a\r\n' 521 b'hello worl\r\n' 522 b'1\r\n' 523 b'd\r\n' 524 b'0\r\n' 525 ) 526 response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)] 527 handler = self.start_server(response) 528 data = self.urlopen("http://localhost:%s/" % handler.port) 529 self.assertEqual(data, expected_response) 530 531 def test_404(self): 532 expected_response = b"Bad bad bad..." 533 handler = self.start_server([(404, [], expected_response)]) 534 535 try: 536 self.urlopen("http://localhost:%s/weeble" % handler.port) 537 except urllib.error.URLError as f: 538 data = f.read() 539 f.close() 540 else: 541 self.fail("404 should raise URLError") 542 543 self.assertEqual(data, expected_response) 544 self.assertEqual(handler.requests, ["/weeble"]) 545 546 def test_200(self): 547 expected_response = b"pycon 2008..." 548 handler = self.start_server([(200, [], expected_response)]) 549 data = self.urlopen("http://localhost:%s/bizarre" % handler.port) 550 self.assertEqual(data, expected_response) 551 self.assertEqual(handler.requests, ["/bizarre"]) 552 553 def test_200_with_parameters(self): 554 expected_response = b"pycon 2008..." 555 handler = self.start_server([(200, [], expected_response)]) 556 data = self.urlopen("http://localhost:%s/bizarre" % handler.port, 557 b"get=with_feeling") 558 self.assertEqual(data, expected_response) 559 self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"]) 560 561 def test_https(self): 562 handler = self.start_https_server() 563 context = ssl.create_default_context(cafile=CERT_localhost) 564 data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context) 565 self.assertEqual(data, b"we care a bit") 566 567 def test_https_with_cafile(self): 568 handler = self.start_https_server(certfile=CERT_localhost) 569 with support.check_warnings(('', DeprecationWarning)): 570 # Good cert 571 data = self.urlopen("https://localhost:%s/bizarre" % handler.port, 572 cafile=CERT_localhost) 573 self.assertEqual(data, b"we care a bit") 574 # Bad cert 575 with self.assertRaises(urllib.error.URLError) as cm: 576 self.urlopen("https://localhost:%s/bizarre" % handler.port, 577 cafile=CERT_fakehostname) 578 # Good cert, but mismatching hostname 579 handler = self.start_https_server(certfile=CERT_fakehostname) 580 with self.assertRaises(urllib.error.URLError) as cm: 581 self.urlopen("https://localhost:%s/bizarre" % handler.port, 582 cafile=CERT_fakehostname) 583 584 def test_https_with_cadefault(self): 585 handler = self.start_https_server(certfile=CERT_localhost) 586 # Self-signed cert should fail verification with system certificate store 587 with support.check_warnings(('', DeprecationWarning)): 588 with self.assertRaises(urllib.error.URLError) as cm: 589 self.urlopen("https://localhost:%s/bizarre" % handler.port, 590 cadefault=True) 591 592 def test_https_sni(self): 593 if ssl is None: 594 self.skipTest("ssl module required") 595 if not ssl.HAS_SNI: 596 self.skipTest("SNI support required in OpenSSL") 597 sni_name = None 598 def cb_sni(ssl_sock, server_name, initial_context): 599 nonlocal sni_name 600 sni_name = server_name 601 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 602 context.set_servername_callback(cb_sni) 603 handler = self.start_https_server(context=context, certfile=CERT_localhost) 604 context = ssl.create_default_context(cafile=CERT_localhost) 605 self.urlopen("https://localhost:%s" % handler.port, context=context) 606 self.assertEqual(sni_name, "localhost") 607 608 def test_sending_headers(self): 609 handler = self.start_server() 610 req = urllib.request.Request("http://localhost:%s/" % handler.port, 611 headers={"Range": "bytes=20-39"}) 612 with urllib.request.urlopen(req): 613 pass 614 self.assertEqual(handler.headers_received["Range"], "bytes=20-39") 615 616 def test_basic(self): 617 handler = self.start_server() 618 with urllib.request.urlopen("http://localhost:%s" % handler.port) as open_url: 619 for attr in ("read", "close", "info", "geturl"): 620 self.assertTrue(hasattr(open_url, attr), "object returned from " 621 "urlopen lacks the %s attribute" % attr) 622 self.assertTrue(open_url.read(), "calling 'read' failed") 623 624 def test_info(self): 625 handler = self.start_server() 626 open_url = urllib.request.urlopen( 627 "http://localhost:%s" % handler.port) 628 with open_url: 629 info_obj = open_url.info() 630 self.assertIsInstance(info_obj, email.message.Message, 631 "object returned by 'info' is not an " 632 "instance of email.message.Message") 633 self.assertEqual(info_obj.get_content_subtype(), "plain") 634 635 def test_geturl(self): 636 # Make sure same URL as opened is returned by geturl. 637 handler = self.start_server() 638 open_url = urllib.request.urlopen("http://localhost:%s" % handler.port) 639 with open_url: 640 url = open_url.geturl() 641 self.assertEqual(url, "http://localhost:%s" % handler.port) 642 643 def test_iteration(self): 644 expected_response = b"pycon 2008..." 645 handler = self.start_server([(200, [], expected_response)]) 646 data = urllib.request.urlopen("http://localhost:%s" % handler.port) 647 for line in data: 648 self.assertEqual(line, expected_response) 649 650 def test_line_iteration(self): 651 lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"] 652 expected_response = b"".join(lines) 653 handler = self.start_server([(200, [], expected_response)]) 654 data = urllib.request.urlopen("http://localhost:%s" % handler.port) 655 for index, line in enumerate(data): 656 self.assertEqual(line, lines[index], 657 "Fetched line number %s doesn't match expected:\n" 658 " Expected length was %s, got %s" % 659 (index, len(lines[index]), len(line))) 660 self.assertEqual(index + 1, len(lines)) 661 662 663threads_key = None 664 665def setUpModule(): 666 # Store the threading_setup in a key and ensure that it is cleaned up 667 # in the tearDown 668 global threads_key 669 threads_key = support.threading_setup() 670 671def tearDownModule(): 672 if threads_key: 673 support.threading_cleanup(*threads_key) 674 675if __name__ == "__main__": 676 unittest.main() 677