1import base64 2import os 3import email 4import urllib.parse 5import urllib.request 6import http.server 7import threading 8import unittest 9import hashlib 10 11from test import support 12 13try: 14 import ssl 15except ImportError: 16 ssl = None 17 18here = os.path.dirname(__file__) 19# Self-signed cert file for 'localhost' 20CERT_localhost = os.path.join(here, 'keycert.pem') 21# Self-signed cert file for 'fakehostname' 22CERT_fakehostname = os.path.join(here, 'keycert2.pem') 23 24 25# Loopback http server infrastructure 26 27class LoopbackHttpServer(http.server.HTTPServer): 28 """HTTP server w/ a few modifications that make it useful for 29 loopback testing purposes. 30 """ 31 32 def __init__(self, server_address, RequestHandlerClass): 33 http.server.HTTPServer.__init__(self, 34 server_address, 35 RequestHandlerClass) 36 37 # Set the timeout of our listening socket really low so 38 # that we can stop the server easily. 39 self.socket.settimeout(0.1) 40 41 def get_request(self): 42 """HTTPServer method, overridden.""" 43 44 request, client_address = self.socket.accept() 45 46 # It's a loopback connection, so setting the timeout 47 # really low shouldn't affect anything, but should make 48 # deadlocks less likely to occur. 49 request.settimeout(10.0) 50 51 return (request, client_address) 52 53class LoopbackHttpServerThread(threading.Thread): 54 """Stoppable thread that runs a loopback http server.""" 55 56 def __init__(self, request_handler): 57 threading.Thread.__init__(self) 58 self._stop_server = False 59 self.ready = threading.Event() 60 request_handler.protocol_version = "HTTP/1.0" 61 self.httpd = LoopbackHttpServer(("127.0.0.1", 0), 62 request_handler) 63 self.port = self.httpd.server_port 64 65 def stop(self): 66 """Stops the webserver if it's currently running.""" 67 68 self._stop_server = True 69 70 self.join() 71 self.httpd.server_close() 72 73 def run(self): 74 self.ready.set() 75 while not self._stop_server: 76 self.httpd.handle_request() 77 78# Authentication infrastructure 79 80class DigestAuthHandler: 81 """Handler for performing digest authentication.""" 82 83 def __init__(self): 84 self._request_num = 0 85 self._nonces = [] 86 self._users = {} 87 self._realm_name = "Test Realm" 88 self._qop = "auth" 89 90 def set_qop(self, qop): 91 self._qop = qop 92 93 def set_users(self, users): 94 assert isinstance(users, dict) 95 self._users = users 96 97 def set_realm(self, realm): 98 self._realm_name = realm 99 100 def _generate_nonce(self): 101 self._request_num += 1 102 nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest() 103 self._nonces.append(nonce) 104 return nonce 105 106 def _create_auth_dict(self, auth_str): 107 first_space_index = auth_str.find(" ") 108 auth_str = auth_str[first_space_index+1:] 109 110 parts = auth_str.split(",") 111 112 auth_dict = {} 113 for part in parts: 114 name, value = part.split("=") 115 name = name.strip() 116 if value[0] == '"' and value[-1] == '"': 117 value = value[1:-1] 118 else: 119 value = value.strip() 120 auth_dict[name] = value 121 return auth_dict 122 123 def _validate_auth(self, auth_dict, password, method, uri): 124 final_dict = {} 125 final_dict.update(auth_dict) 126 final_dict["password"] = password 127 final_dict["method"] = method 128 final_dict["uri"] = uri 129 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict 130 HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest() 131 HA2_str = "%(method)s:%(uri)s" % final_dict 132 HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest() 133 final_dict["HA1"] = HA1 134 final_dict["HA2"] = HA2 135 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \ 136 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict 137 response = hashlib.md5(response_str.encode("ascii")).hexdigest() 138 139 return response == auth_dict["response"] 140 141 def _return_auth_challenge(self, request_handler): 142 request_handler.send_response(407, "Proxy Authentication Required") 143 request_handler.send_header("Content-Type", "text/html") 144 request_handler.send_header( 145 'Proxy-Authenticate', 'Digest realm="%s", ' 146 'qop="%s",' 147 'nonce="%s", ' % \ 148 (self._realm_name, self._qop, self._generate_nonce())) 149 # XXX: Not sure if we're supposed to add this next header or 150 # not. 151 #request_handler.send_header('Connection', 'close') 152 request_handler.end_headers() 153 request_handler.wfile.write(b"Proxy Authentication Required.") 154 return False 155 156 def handle_request(self, request_handler): 157 """Performs digest authentication on the given HTTP request 158 handler. Returns True if authentication was successful, False 159 otherwise. 160 161 If no users have been set, then digest auth is effectively 162 disabled and this method will always return True. 163 """ 164 165 if len(self._users) == 0: 166 return True 167 168 if "Proxy-Authorization" not in request_handler.headers: 169 return self._return_auth_challenge(request_handler) 170 else: 171 auth_dict = self._create_auth_dict( 172 request_handler.headers["Proxy-Authorization"] 173 ) 174 if auth_dict["username"] in self._users: 175 password = self._users[ auth_dict["username"] ] 176 else: 177 return self._return_auth_challenge(request_handler) 178 if not auth_dict.get("nonce") in self._nonces: 179 return self._return_auth_challenge(request_handler) 180 else: 181 self._nonces.remove(auth_dict["nonce"]) 182 183 auth_validated = False 184 185 # MSIE uses short_path in its validation, but Python's 186 # urllib.request uses the full path, so we're going to see if 187 # either of them works here. 188 189 for path in [request_handler.path, request_handler.short_path]: 190 if self._validate_auth(auth_dict, 191 password, 192 request_handler.command, 193 path): 194 auth_validated = True 195 196 if not auth_validated: 197 return self._return_auth_challenge(request_handler) 198 return True 199 200 201class BasicAuthHandler(http.server.BaseHTTPRequestHandler): 202 """Handler for performing basic authentication.""" 203 # Server side values 204 USER = 'testUser' 205 PASSWD = 'testPass' 206 REALM = 'Test' 207 USER_PASSWD = "%s:%s" % (USER, PASSWD) 208 ENCODED_AUTH = base64.b64encode(USER_PASSWD.encode('ascii')).decode('ascii') 209 210 def __init__(self, *args, **kwargs): 211 http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) 212 213 def log_message(self, format, *args): 214 # Suppress console log message 215 pass 216 217 def do_HEAD(self): 218 self.send_response(200) 219 self.send_header("Content-type", "text/html") 220 self.end_headers() 221 222 def do_AUTHHEAD(self): 223 self.send_response(401) 224 self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM) 225 self.send_header("Content-type", "text/html") 226 self.end_headers() 227 228 def do_GET(self): 229 if not self.headers.get("Authorization", ""): 230 self.do_AUTHHEAD() 231 self.wfile.write(b"No Auth header received") 232 elif self.headers.get( 233 "Authorization", "") == "Basic " + self.ENCODED_AUTH: 234 self.send_response(200) 235 self.end_headers() 236 self.wfile.write(b"It works") 237 else: 238 # Request Unauthorized 239 self.do_AUTHHEAD() 240 241 242 243# Proxy test infrastructure 244 245class FakeProxyHandler(http.server.BaseHTTPRequestHandler): 246 """This is a 'fake proxy' that makes it look like the entire 247 internet has gone down due to a sudden zombie invasion. It main 248 utility is in providing us with authentication support for 249 testing. 250 """ 251 252 def __init__(self, digest_auth_handler, *args, **kwargs): 253 # This has to be set before calling our parent's __init__(), which will 254 # try to call do_GET(). 255 self.digest_auth_handler = digest_auth_handler 256 http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) 257 258 def log_message(self, format, *args): 259 # Uncomment the next line for debugging. 260 # sys.stderr.write(format % args) 261 pass 262 263 def do_GET(self): 264 (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse( 265 self.path, "http") 266 self.short_path = path 267 if self.digest_auth_handler.handle_request(self): 268 self.send_response(200, "OK") 269 self.send_header("Content-Type", "text/html") 270 self.end_headers() 271 self.wfile.write(bytes("You've reached %s!<BR>" % self.path, 272 "ascii")) 273 self.wfile.write(b"Our apologies, but our server is down due to " 274 b"a sudden zombie invasion.") 275 276# Test cases 277 278class BasicAuthTests(unittest.TestCase): 279 USER = "testUser" 280 PASSWD = "testPass" 281 INCORRECT_PASSWD = "Incorrect" 282 REALM = "Test" 283 284 def setUp(self): 285 super(BasicAuthTests, self).setUp() 286 # With Basic Authentication 287 def http_server_with_basic_auth_handler(*args, **kwargs): 288 return BasicAuthHandler(*args, **kwargs) 289 self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler) 290 self.addCleanup(self.stop_server) 291 self.server_url = 'http://127.0.0.1:%s' % self.server.port 292 self.server.start() 293 self.server.ready.wait() 294 295 def stop_server(self): 296 self.server.stop() 297 self.server = None 298 299 def tearDown(self): 300 super(BasicAuthTests, self).tearDown() 301 302 def test_basic_auth_success(self): 303 ah = urllib.request.HTTPBasicAuthHandler() 304 ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD) 305 urllib.request.install_opener(urllib.request.build_opener(ah)) 306 try: 307 self.assertTrue(urllib.request.urlopen(self.server_url)) 308 except urllib.error.HTTPError: 309 self.fail("Basic auth failed for the url: %s" % self.server_url) 310 311 def test_basic_auth_httperror(self): 312 ah = urllib.request.HTTPBasicAuthHandler() 313 ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD) 314 urllib.request.install_opener(urllib.request.build_opener(ah)) 315 self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url) 316 317 318class ProxyAuthTests(unittest.TestCase): 319 URL = "http://localhost" 320 321 USER = "tester" 322 PASSWD = "test123" 323 REALM = "TestRealm" 324 325 @support.requires_hashdigest("md5") 326 def setUp(self): 327 super(ProxyAuthTests, self).setUp() 328 # Ignore proxy bypass settings in the environment. 329 def restore_environ(old_environ): 330 os.environ.clear() 331 os.environ.update(old_environ) 332 self.addCleanup(restore_environ, os.environ.copy()) 333 os.environ['NO_PROXY'] = '' 334 os.environ['no_proxy'] = '' 335 336 self.digest_auth_handler = DigestAuthHandler() 337 self.digest_auth_handler.set_users({self.USER: self.PASSWD}) 338 self.digest_auth_handler.set_realm(self.REALM) 339 # With Digest Authentication. 340 def create_fake_proxy_handler(*args, **kwargs): 341 return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs) 342 343 self.server = LoopbackHttpServerThread(create_fake_proxy_handler) 344 self.addCleanup(self.stop_server) 345 self.server.start() 346 self.server.ready.wait() 347 proxy_url = "http://127.0.0.1:%d" % self.server.port 348 handler = urllib.request.ProxyHandler({"http" : proxy_url}) 349 self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler() 350 self.opener = urllib.request.build_opener( 351 handler, self.proxy_digest_handler) 352 353 def stop_server(self): 354 self.server.stop() 355 self.server = None 356 357 def test_proxy_with_bad_password_raises_httperror(self): 358 self.proxy_digest_handler.add_password(self.REALM, self.URL, 359 self.USER, self.PASSWD+"bad") 360 self.digest_auth_handler.set_qop("auth") 361 self.assertRaises(urllib.error.HTTPError, 362 self.opener.open, 363 self.URL) 364 365 def test_proxy_with_no_password_raises_httperror(self): 366 self.digest_auth_handler.set_qop("auth") 367 self.assertRaises(urllib.error.HTTPError, 368 self.opener.open, 369 self.URL) 370 371 def test_proxy_qop_auth_works(self): 372 self.proxy_digest_handler.add_password(self.REALM, self.URL, 373 self.USER, self.PASSWD) 374 self.digest_auth_handler.set_qop("auth") 375 with self.opener.open(self.URL) as result: 376 while result.read(): 377 pass 378 379 def test_proxy_qop_auth_int_works_or_throws_urlerror(self): 380 self.proxy_digest_handler.add_password(self.REALM, self.URL, 381 self.USER, self.PASSWD) 382 self.digest_auth_handler.set_qop("auth-int") 383 try: 384 result = self.opener.open(self.URL) 385 except urllib.error.URLError: 386 # It's okay if we don't support auth-int, but we certainly 387 # shouldn't receive any kind of exception here other than 388 # a URLError. 389 pass 390 else: 391 with result: 392 while result.read(): 393 pass 394 395 396def GetRequestHandler(responses): 397 398 class FakeHTTPRequestHandler(http.server.BaseHTTPRequestHandler): 399 400 server_version = "TestHTTP/" 401 requests = [] 402 headers_received = [] 403 port = 80 404 405 def do_GET(self): 406 body = self.send_head() 407 while body: 408 done = self.wfile.write(body) 409 body = body[done:] 410 411 def do_POST(self): 412 content_length = self.headers["Content-Length"] 413 post_data = self.rfile.read(int(content_length)) 414 self.do_GET() 415 self.requests.append(post_data) 416 417 def send_head(self): 418 FakeHTTPRequestHandler.headers_received = self.headers 419 self.requests.append(self.path) 420 response_code, headers, body = responses.pop(0) 421 422 self.send_response(response_code) 423 424 for (header, value) in headers: 425 self.send_header(header, value % {'port':self.port}) 426 if body: 427 self.send_header("Content-type", "text/plain") 428 self.end_headers() 429 return body 430 self.end_headers() 431 432 def log_message(self, *args): 433 pass 434 435 436 return FakeHTTPRequestHandler 437 438 439class TestUrlopen(unittest.TestCase): 440 """Tests urllib.request.urlopen using the network. 441 442 These tests are not exhaustive. Assuming that testing using files does a 443 good job overall of some of the basic interface features. There are no 444 tests exercising the optional 'data' and 'proxies' arguments. No tests 445 for transparent redirection have been written. 446 """ 447 448 def setUp(self): 449 super(TestUrlopen, self).setUp() 450 451 # Ignore proxies for localhost tests. 452 def restore_environ(old_environ): 453 os.environ.clear() 454 os.environ.update(old_environ) 455 self.addCleanup(restore_environ, os.environ.copy()) 456 os.environ['NO_PROXY'] = '*' 457 os.environ['no_proxy'] = '*' 458 459 def urlopen(self, url, data=None, **kwargs): 460 l = [] 461 f = urllib.request.urlopen(url, data, **kwargs) 462 try: 463 # Exercise various methods 464 l.extend(f.readlines(200)) 465 l.append(f.readline()) 466 l.append(f.read(1024)) 467 l.append(f.read()) 468 finally: 469 f.close() 470 return b"".join(l) 471 472 def stop_server(self): 473 self.server.stop() 474 self.server = None 475 476 def start_server(self, responses=None): 477 if responses is None: 478 responses = [(200, [], b"we don't care")] 479 handler = GetRequestHandler(responses) 480 481 self.server = LoopbackHttpServerThread(handler) 482 self.addCleanup(self.stop_server) 483 self.server.start() 484 self.server.ready.wait() 485 port = self.server.port 486 handler.port = port 487 return handler 488 489 def start_https_server(self, responses=None, **kwargs): 490 if not hasattr(urllib.request, 'HTTPSHandler'): 491 self.skipTest('ssl support required') 492 from test.ssl_servers import make_https_server 493 if responses is None: 494 responses = [(200, [], b"we care a bit")] 495 handler = GetRequestHandler(responses) 496 server = make_https_server(self, handler_class=handler, **kwargs) 497 handler.port = server.port 498 return handler 499 500 def test_redirection(self): 501 expected_response = b"We got here..." 502 responses = [ 503 (302, [("Location", "http://localhost:%(port)s/somewhere_else")], 504 ""), 505 (200, [], expected_response) 506 ] 507 508 handler = self.start_server(responses) 509 data = self.urlopen("http://localhost:%s/" % handler.port) 510 self.assertEqual(data, expected_response) 511 self.assertEqual(handler.requests, ["/", "/somewhere_else"]) 512 513 def test_chunked(self): 514 expected_response = b"hello world" 515 chunked_start = ( 516 b'a\r\n' 517 b'hello worl\r\n' 518 b'1\r\n' 519 b'd\r\n' 520 b'0\r\n' 521 ) 522 response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)] 523 handler = self.start_server(response) 524 data = self.urlopen("http://localhost:%s/" % handler.port) 525 self.assertEqual(data, expected_response) 526 527 def test_404(self): 528 expected_response = b"Bad bad bad..." 529 handler = self.start_server([(404, [], expected_response)]) 530 531 try: 532 self.urlopen("http://localhost:%s/weeble" % handler.port) 533 except urllib.error.URLError as f: 534 data = f.read() 535 f.close() 536 else: 537 self.fail("404 should raise URLError") 538 539 self.assertEqual(data, expected_response) 540 self.assertEqual(handler.requests, ["/weeble"]) 541 542 def test_200(self): 543 expected_response = b"pycon 2008..." 544 handler = self.start_server([(200, [], expected_response)]) 545 data = self.urlopen("http://localhost:%s/bizarre" % handler.port) 546 self.assertEqual(data, expected_response) 547 self.assertEqual(handler.requests, ["/bizarre"]) 548 549 def test_200_with_parameters(self): 550 expected_response = b"pycon 2008..." 551 handler = self.start_server([(200, [], expected_response)]) 552 data = self.urlopen("http://localhost:%s/bizarre" % handler.port, 553 b"get=with_feeling") 554 self.assertEqual(data, expected_response) 555 self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"]) 556 557 def test_https(self): 558 handler = self.start_https_server() 559 context = ssl.create_default_context(cafile=CERT_localhost) 560 data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context) 561 self.assertEqual(data, b"we care a bit") 562 563 def test_https_with_cafile(self): 564 handler = self.start_https_server(certfile=CERT_localhost) 565 with support.check_warnings(('', DeprecationWarning)): 566 # Good cert 567 data = self.urlopen("https://localhost:%s/bizarre" % handler.port, 568 cafile=CERT_localhost) 569 self.assertEqual(data, b"we care a bit") 570 # Bad cert 571 with self.assertRaises(urllib.error.URLError) as cm: 572 self.urlopen("https://localhost:%s/bizarre" % handler.port, 573 cafile=CERT_fakehostname) 574 # Good cert, but mismatching hostname 575 handler = self.start_https_server(certfile=CERT_fakehostname) 576 with self.assertRaises(urllib.error.URLError) as cm: 577 self.urlopen("https://localhost:%s/bizarre" % handler.port, 578 cafile=CERT_fakehostname) 579 580 def test_https_with_cadefault(self): 581 handler = self.start_https_server(certfile=CERT_localhost) 582 # Self-signed cert should fail verification with system certificate store 583 with support.check_warnings(('', DeprecationWarning)): 584 with self.assertRaises(urllib.error.URLError) as cm: 585 self.urlopen("https://localhost:%s/bizarre" % handler.port, 586 cadefault=True) 587 588 def test_https_sni(self): 589 if ssl is None: 590 self.skipTest("ssl module required") 591 if not ssl.HAS_SNI: 592 self.skipTest("SNI support required in OpenSSL") 593 sni_name = None 594 def cb_sni(ssl_sock, server_name, initial_context): 595 nonlocal sni_name 596 sni_name = server_name 597 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 598 context.set_servername_callback(cb_sni) 599 handler = self.start_https_server(context=context, certfile=CERT_localhost) 600 context = ssl.create_default_context(cafile=CERT_localhost) 601 self.urlopen("https://localhost:%s" % handler.port, context=context) 602 self.assertEqual(sni_name, "localhost") 603 604 def test_sending_headers(self): 605 handler = self.start_server() 606 req = urllib.request.Request("http://localhost:%s/" % handler.port, 607 headers={"Range": "bytes=20-39"}) 608 with urllib.request.urlopen(req): 609 pass 610 self.assertEqual(handler.headers_received["Range"], "bytes=20-39") 611 612 def test_basic(self): 613 handler = self.start_server() 614 with urllib.request.urlopen("http://localhost:%s" % handler.port) as open_url: 615 for attr in ("read", "close", "info", "geturl"): 616 self.assertTrue(hasattr(open_url, attr), "object returned from " 617 "urlopen lacks the %s attribute" % attr) 618 self.assertTrue(open_url.read(), "calling 'read' failed") 619 620 def test_info(self): 621 handler = self.start_server() 622 open_url = urllib.request.urlopen( 623 "http://localhost:%s" % handler.port) 624 with open_url: 625 info_obj = open_url.info() 626 self.assertIsInstance(info_obj, email.message.Message, 627 "object returned by 'info' is not an " 628 "instance of email.message.Message") 629 self.assertEqual(info_obj.get_content_subtype(), "plain") 630 631 def test_geturl(self): 632 # Make sure same URL as opened is returned by geturl. 633 handler = self.start_server() 634 open_url = urllib.request.urlopen("http://localhost:%s" % handler.port) 635 with open_url: 636 url = open_url.geturl() 637 self.assertEqual(url, "http://localhost:%s" % handler.port) 638 639 def test_iteration(self): 640 expected_response = b"pycon 2008..." 641 handler = self.start_server([(200, [], expected_response)]) 642 data = urllib.request.urlopen("http://localhost:%s" % handler.port) 643 for line in data: 644 self.assertEqual(line, expected_response) 645 646 def test_line_iteration(self): 647 lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"] 648 expected_response = b"".join(lines) 649 handler = self.start_server([(200, [], expected_response)]) 650 data = urllib.request.urlopen("http://localhost:%s" % handler.port) 651 for index, line in enumerate(data): 652 self.assertEqual(line, lines[index], 653 "Fetched line number %s doesn't match expected:\n" 654 " Expected length was %s, got %s" % 655 (index, len(lines[index]), len(line))) 656 self.assertEqual(index + 1, len(lines)) 657 658 659threads_key = None 660 661def setUpModule(): 662 # Store the threading_setup in a key and ensure that it is cleaned up 663 # in the tearDown 664 global threads_key 665 threads_key = support.threading_setup() 666 667def tearDownModule(): 668 if threads_key: 669 support.threading_cleanup(*threads_key) 670 671if __name__ == "__main__": 672 unittest.main() 673