1import os 2import base64 3import urlparse 4import urllib2 5import BaseHTTPServer 6import unittest 7import hashlib 8 9from test import test_support 10 11mimetools = test_support.import_module('mimetools', deprecated=True) 12threading = test_support.import_module('threading') 13 14try: 15 import ssl 16except ImportError: 17 ssl = None 18 19here = os.path.dirname(__file__) 20# Self-signed cert file for 'localhost' 21CERT_localhost = os.path.join(here, 'keycert.pem') 22# Self-signed cert file for 'fakehostname' 23CERT_fakehostname = os.path.join(here, 'keycert2.pem') 24 25# Loopback http server infrastructure 26 27class LoopbackHttpServer(BaseHTTPServer.HTTPServer): 28 """HTTP server w/ a few modifications that make it useful for 29 loopback testing purposes. 30 """ 31 32 def __init__(self, server_address, RequestHandlerClass): 33 BaseHTTPServer.HTTPServer.__init__(self, 34 server_address, 35 RequestHandlerClass) 36 37 # Set the timeout of our listening socket really low so 38 # that we can stop the server easily. 39 self.socket.settimeout(0.1) 40 41 def get_request(self): 42 """BaseHTTPServer method, overridden.""" 43 44 request, client_address = self.socket.accept() 45 46 # It's a loopback connection, so setting the timeout 47 # really low shouldn't affect anything, but should make 48 # deadlocks less likely to occur. 49 request.settimeout(10.0) 50 51 return (request, client_address) 52 53class LoopbackHttpServerThread(threading.Thread): 54 """Stoppable thread that runs a loopback http server.""" 55 56 def __init__(self, request_handler): 57 threading.Thread.__init__(self) 58 self._stop = False 59 self.ready = threading.Event() 60 request_handler.protocol_version = "HTTP/1.0" 61 self.httpd = LoopbackHttpServer(('127.0.0.1', 0), 62 request_handler) 63 #print "Serving HTTP on %s port %s" % (self.httpd.server_name, 64 # self.httpd.server_port) 65 self.port = self.httpd.server_port 66 67 def stop(self): 68 """Stops the webserver if it's currently running.""" 69 70 # Set the stop flag. 71 self._stop = True 72 73 self.join() 74 75 def run(self): 76 self.ready.set() 77 while not self._stop: 78 self.httpd.handle_request() 79 80# Authentication infrastructure 81 82 83class BasicAuthHandler(BaseHTTPServer.BaseHTTPRequestHandler): 84 """Handler for performing Basic Authentication.""" 85 # Server side values 86 USER = "testUser" 87 PASSWD = "testPass" 88 REALM = "Test" 89 USER_PASSWD = "%s:%s" % (USER, PASSWD) 90 ENCODED_AUTH = base64.b64encode(USER_PASSWD) 91 92 def __init__(self, *args, **kwargs): 93 BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) 94 95 def log_message(self, format, *args): 96 # Suppress the HTTP Console log output 97 pass 98 99 def do_HEAD(self): 100 self.send_response(200) 101 self.send_header("Content-type", "text/html") 102 self.end_headers() 103 104 def do_AUTHHEAD(self): 105 self.send_response(401) 106 self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM) 107 self.send_header("Content-type", "text/html") 108 self.end_headers() 109 110 def do_GET(self): 111 if self.headers.getheader("Authorization") == None: 112 self.do_AUTHHEAD() 113 self.wfile.write("No Auth Header Received") 114 elif self.headers.getheader( 115 "Authorization") == "Basic " + self.ENCODED_AUTH: 116 self.wfile.write("It works!") 117 else: 118 # Unauthorized Request 119 self.do_AUTHHEAD() 120 121 122class DigestAuthHandler: 123 """Handler for performing digest authentication.""" 124 125 def __init__(self): 126 self._request_num = 0 127 self._nonces = [] 128 self._users = {} 129 self._realm_name = "Test Realm" 130 self._qop = "auth" 131 132 def set_qop(self, qop): 133 self._qop = qop 134 135 def set_users(self, users): 136 assert isinstance(users, dict) 137 self._users = users 138 139 def set_realm(self, realm): 140 self._realm_name = realm 141 142 def _generate_nonce(self): 143 self._request_num += 1 144 nonce = hashlib.md5(str(self._request_num)).hexdigest() 145 self._nonces.append(nonce) 146 return nonce 147 148 def _create_auth_dict(self, auth_str): 149 first_space_index = auth_str.find(" ") 150 auth_str = auth_str[first_space_index+1:] 151 152 parts = auth_str.split(",") 153 154 auth_dict = {} 155 for part in parts: 156 name, value = part.split("=") 157 name = name.strip() 158 if value[0] == '"' and value[-1] == '"': 159 value = value[1:-1] 160 else: 161 value = value.strip() 162 auth_dict[name] = value 163 return auth_dict 164 165 def _validate_auth(self, auth_dict, password, method, uri): 166 final_dict = {} 167 final_dict.update(auth_dict) 168 final_dict["password"] = password 169 final_dict["method"] = method 170 final_dict["uri"] = uri 171 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict 172 HA1 = hashlib.md5(HA1_str).hexdigest() 173 HA2_str = "%(method)s:%(uri)s" % final_dict 174 HA2 = hashlib.md5(HA2_str).hexdigest() 175 final_dict["HA1"] = HA1 176 final_dict["HA2"] = HA2 177 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \ 178 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict 179 response = hashlib.md5(response_str).hexdigest() 180 181 return response == auth_dict["response"] 182 183 def _return_auth_challenge(self, request_handler): 184 request_handler.send_response(407, "Proxy Authentication Required") 185 request_handler.send_header("Content-Type", "text/html") 186 request_handler.send_header( 187 'Proxy-Authenticate', 'Digest realm="%s", ' 188 'qop="%s",' 189 'nonce="%s", ' % \ 190 (self._realm_name, self._qop, self._generate_nonce())) 191 # XXX: Not sure if we're supposed to add this next header or 192 # not. 193 #request_handler.send_header('Connection', 'close') 194 request_handler.end_headers() 195 request_handler.wfile.write("Proxy Authentication Required.") 196 return False 197 198 def handle_request(self, request_handler): 199 """Performs digest authentication on the given HTTP request 200 handler. Returns True if authentication was successful, False 201 otherwise. 202 203 If no users have been set, then digest auth is effectively 204 disabled and this method will always return True. 205 """ 206 207 if len(self._users) == 0: 208 return True 209 210 if 'Proxy-Authorization' not in request_handler.headers: 211 return self._return_auth_challenge(request_handler) 212 else: 213 auth_dict = self._create_auth_dict( 214 request_handler.headers['Proxy-Authorization'] 215 ) 216 if auth_dict["username"] in self._users: 217 password = self._users[ auth_dict["username"] ] 218 else: 219 return self._return_auth_challenge(request_handler) 220 if not auth_dict.get("nonce") in self._nonces: 221 return self._return_auth_challenge(request_handler) 222 else: 223 self._nonces.remove(auth_dict["nonce"]) 224 225 auth_validated = False 226 227 # MSIE uses short_path in its validation, but Python's 228 # urllib2 uses the full path, so we're going to see if 229 # either of them works here. 230 231 for path in [request_handler.path, request_handler.short_path]: 232 if self._validate_auth(auth_dict, 233 password, 234 request_handler.command, 235 path): 236 auth_validated = True 237 238 if not auth_validated: 239 return self._return_auth_challenge(request_handler) 240 return True 241 242# Proxy test infrastructure 243 244class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): 245 """This is a 'fake proxy' that makes it look like the entire 246 internet has gone down due to a sudden zombie invasion. It main 247 utility is in providing us with authentication support for 248 testing. 249 """ 250 251 def __init__(self, digest_auth_handler, *args, **kwargs): 252 # This has to be set before calling our parent's __init__(), which will 253 # try to call do_GET(). 254 self.digest_auth_handler = digest_auth_handler 255 BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) 256 257 def log_message(self, format, *args): 258 # Uncomment the next line for debugging. 259 #sys.stderr.write(format % args) 260 pass 261 262 def do_GET(self): 263 (scm, netloc, path, params, query, fragment) = urlparse.urlparse( 264 self.path, 'http') 265 self.short_path = path 266 if self.digest_auth_handler.handle_request(self): 267 self.send_response(200, "OK") 268 self.send_header("Content-Type", "text/html") 269 self.end_headers() 270 self.wfile.write("You've reached %s!<BR>" % self.path) 271 self.wfile.write("Our apologies, but our server is down due to " 272 "a sudden zombie invasion.") 273 274# Test cases 275 276class BaseTestCase(unittest.TestCase): 277 def setUp(self): 278 self._threads = test_support.threading_setup() 279 280 def tearDown(self): 281 test_support.threading_cleanup(*self._threads) 282 283 284class BasicAuthTests(BaseTestCase): 285 USER = "testUser" 286 PASSWD = "testPass" 287 INCORRECT_PASSWD = "Incorrect" 288 REALM = "Test" 289 290 def setUp(self): 291 super(BasicAuthTests, self).setUp() 292 # With Basic Authentication 293 def http_server_with_basic_auth_handler(*args, **kwargs): 294 return BasicAuthHandler(*args, **kwargs) 295 self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler) 296 self.server_url = 'http://127.0.0.1:%s' % self.server.port 297 self.server.start() 298 self.server.ready.wait() 299 300 def tearDown(self): 301 self.server.stop() 302 super(BasicAuthTests, self).tearDown() 303 304 def test_basic_auth_success(self): 305 ah = urllib2.HTTPBasicAuthHandler() 306 ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD) 307 urllib2.install_opener(urllib2.build_opener(ah)) 308 try: 309 self.assertTrue(urllib2.urlopen(self.server_url)) 310 except urllib2.HTTPError: 311 self.fail("Basic Auth Failed for url: %s" % self.server_url) 312 except Exception as e: 313 raise e 314 315 def test_basic_auth_httperror(self): 316 ah = urllib2.HTTPBasicAuthHandler() 317 ah.add_password(self.REALM, self.server_url, self.USER, 318 self.INCORRECT_PASSWD) 319 urllib2.install_opener(urllib2.build_opener(ah)) 320 self.assertRaises(urllib2.HTTPError, urllib2.urlopen, self.server_url) 321 322 323class ProxyAuthTests(BaseTestCase): 324 URL = "http://localhost" 325 326 USER = "tester" 327 PASSWD = "test123" 328 REALM = "TestRealm" 329 330 def setUp(self): 331 super(ProxyAuthTests, self).setUp() 332 # Ignore proxy bypass settings in the environment. 333 def restore_environ(old_environ): 334 os.environ.clear() 335 os.environ.update(old_environ) 336 self.addCleanup(restore_environ, os.environ.copy()) 337 os.environ['NO_PROXY'] = '' 338 os.environ['no_proxy'] = '' 339 340 self.digest_auth_handler = DigestAuthHandler() 341 self.digest_auth_handler.set_users({self.USER: self.PASSWD}) 342 self.digest_auth_handler.set_realm(self.REALM) 343 # With Digest Authentication 344 def create_fake_proxy_handler(*args, **kwargs): 345 return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs) 346 347 self.server = LoopbackHttpServerThread(create_fake_proxy_handler) 348 self.server.start() 349 self.server.ready.wait() 350 proxy_url = "http://127.0.0.1:%d" % self.server.port 351 handler = urllib2.ProxyHandler({"http" : proxy_url}) 352 self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler() 353 self.opener = urllib2.build_opener(handler, self.proxy_digest_handler) 354 355 def tearDown(self): 356 self.server.stop() 357 super(ProxyAuthTests, self).tearDown() 358 359 def test_proxy_with_bad_password_raises_httperror(self): 360 self.proxy_digest_handler.add_password(self.REALM, self.URL, 361 self.USER, self.PASSWD+"bad") 362 self.digest_auth_handler.set_qop("auth") 363 self.assertRaises(urllib2.HTTPError, 364 self.opener.open, 365 self.URL) 366 367 def test_proxy_with_no_password_raises_httperror(self): 368 self.digest_auth_handler.set_qop("auth") 369 self.assertRaises(urllib2.HTTPError, 370 self.opener.open, 371 self.URL) 372 373 def test_proxy_qop_auth_works(self): 374 self.proxy_digest_handler.add_password(self.REALM, self.URL, 375 self.USER, self.PASSWD) 376 self.digest_auth_handler.set_qop("auth") 377 result = self.opener.open(self.URL) 378 while result.read(): 379 pass 380 result.close() 381 382 def test_proxy_qop_auth_int_works_or_throws_urlerror(self): 383 self.proxy_digest_handler.add_password(self.REALM, self.URL, 384 self.USER, self.PASSWD) 385 self.digest_auth_handler.set_qop("auth-int") 386 try: 387 result = self.opener.open(self.URL) 388 except urllib2.URLError: 389 # It's okay if we don't support auth-int, but we certainly 390 # shouldn't receive any kind of exception here other than 391 # a URLError. 392 result = None 393 if result: 394 while result.read(): 395 pass 396 result.close() 397 398 399def GetRequestHandler(responses): 400 401 class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): 402 403 server_version = "TestHTTP/" 404 requests = [] 405 headers_received = [] 406 port = 80 407 408 def do_GET(self): 409 body = self.send_head() 410 if body: 411 self.wfile.write(body) 412 413 def do_POST(self): 414 content_length = self.headers['Content-Length'] 415 post_data = self.rfile.read(int(content_length)) 416 self.do_GET() 417 self.requests.append(post_data) 418 419 def send_head(self): 420 FakeHTTPRequestHandler.headers_received = self.headers 421 self.requests.append(self.path) 422 response_code, headers, body = responses.pop(0) 423 424 self.send_response(response_code) 425 426 for (header, value) in headers: 427 self.send_header(header, value % self.port) 428 if body: 429 self.send_header('Content-type', 'text/plain') 430 self.end_headers() 431 return body 432 self.end_headers() 433 434 def log_message(self, *args): 435 pass 436 437 438 return FakeHTTPRequestHandler 439 440 441class TestUrlopen(BaseTestCase): 442 """Tests urllib2.urlopen using the network. 443 444 These tests are not exhaustive. Assuming that testing using files does a 445 good job overall of some of the basic interface features. There are no 446 tests exercising the optional 'data' and 'proxies' arguments. No tests 447 for transparent redirection have been written. 448 """ 449 450 def setUp(self): 451 proxy_handler = urllib2.ProxyHandler({}) 452 opener = urllib2.build_opener(proxy_handler) 453 urllib2.install_opener(opener) 454 super(TestUrlopen, self).setUp() 455 456 def urlopen(self, url, data=None, **kwargs): 457 l = [] 458 f = urllib2.urlopen(url, data, **kwargs) 459 try: 460 # Exercise various methods 461 l.extend(f.readlines(200)) 462 l.append(f.readline()) 463 l.append(f.read(1024)) 464 l.append(f.read()) 465 finally: 466 f.close() 467 return b"".join(l) 468 469 def start_server(self, responses): 470 handler = GetRequestHandler(responses) 471 472 self.server = LoopbackHttpServerThread(handler) 473 self.server.start() 474 self.server.ready.wait() 475 port = self.server.port 476 handler.port = port 477 return handler 478 479 def start_https_server(self, responses=None, **kwargs): 480 if not hasattr(urllib2, 'HTTPSHandler'): 481 self.skipTest('ssl support required') 482 from test.ssl_servers import make_https_server 483 if responses is None: 484 responses = [(200, [], b"we care a bit")] 485 handler = GetRequestHandler(responses) 486 server = make_https_server(self, handler_class=handler, **kwargs) 487 handler.port = server.port 488 return handler 489 490 def test_redirection(self): 491 expected_response = 'We got here...' 492 responses = [ 493 (302, [('Location', 'http://localhost:%s/somewhere_else')], ''), 494 (200, [], expected_response) 495 ] 496 497 handler = self.start_server(responses) 498 499 try: 500 f = urllib2.urlopen('http://localhost:%s/' % handler.port) 501 data = f.read() 502 f.close() 503 504 self.assertEqual(data, expected_response) 505 self.assertEqual(handler.requests, ['/', '/somewhere_else']) 506 finally: 507 self.server.stop() 508 509 510 def test_404(self): 511 expected_response = 'Bad bad bad...' 512 handler = self.start_server([(404, [], expected_response)]) 513 514 try: 515 try: 516 urllib2.urlopen('http://localhost:%s/weeble' % handler.port) 517 except urllib2.URLError, f: 518 pass 519 else: 520 self.fail('404 should raise URLError') 521 522 data = f.read() 523 f.close() 524 525 self.assertEqual(data, expected_response) 526 self.assertEqual(handler.requests, ['/weeble']) 527 finally: 528 self.server.stop() 529 530 531 def test_200(self): 532 expected_response = 'pycon 2008...' 533 handler = self.start_server([(200, [], expected_response)]) 534 535 try: 536 f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port) 537 data = f.read() 538 f.close() 539 540 self.assertEqual(data, expected_response) 541 self.assertEqual(handler.requests, ['/bizarre']) 542 finally: 543 self.server.stop() 544 545 def test_200_with_parameters(self): 546 expected_response = 'pycon 2008...' 547 handler = self.start_server([(200, [], expected_response)]) 548 549 try: 550 f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling') 551 data = f.read() 552 f.close() 553 554 self.assertEqual(data, expected_response) 555 self.assertEqual(handler.requests, ['/bizarre', 'get=with_feeling']) 556 finally: 557 self.server.stop() 558 559 def test_https(self): 560 handler = self.start_https_server() 561 context = ssl.create_default_context(cafile=CERT_localhost) 562 data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context) 563 self.assertEqual(data, b"we care a bit") 564 565 def test_https_with_cafile(self): 566 handler = self.start_https_server(certfile=CERT_localhost) 567 # Good cert 568 data = self.urlopen("https://localhost:%s/bizarre" % handler.port, 569 cafile=CERT_localhost) 570 self.assertEqual(data, b"we care a bit") 571 # Bad cert 572 with self.assertRaises(urllib2.URLError): 573 self.urlopen("https://localhost:%s/bizarre" % handler.port, 574 cafile=CERT_fakehostname) 575 # Good cert, but mismatching hostname 576 handler = self.start_https_server(certfile=CERT_fakehostname) 577 with self.assertRaises(ssl.CertificateError): 578 self.urlopen("https://localhost:%s/bizarre" % handler.port, 579 cafile=CERT_fakehostname) 580 581 def test_https_with_cadefault(self): 582 handler = self.start_https_server(certfile=CERT_localhost) 583 # Self-signed cert should fail verification with system certificate store 584 with self.assertRaises(urllib2.URLError): 585 self.urlopen("https://localhost:%s/bizarre" % handler.port, 586 cadefault=True) 587 588 def test_https_sni(self): 589 if ssl is None: 590 self.skipTest("ssl module required") 591 if not ssl.HAS_SNI: 592 self.skipTest("SNI support required in OpenSSL") 593 sni_name = [None] 594 def cb_sni(ssl_sock, server_name, initial_context): 595 sni_name[0] = server_name 596 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 597 context.set_servername_callback(cb_sni) 598 handler = self.start_https_server(context=context, certfile=CERT_localhost) 599 context = ssl.create_default_context(cafile=CERT_localhost) 600 self.urlopen("https://localhost:%s" % handler.port, context=context) 601 self.assertEqual(sni_name[0], "localhost") 602 603 def test_sending_headers(self): 604 handler = self.start_server([(200, [], "we don't care")]) 605 606 try: 607 req = urllib2.Request("http://localhost:%s/" % handler.port, 608 headers={'Range': 'bytes=20-39'}) 609 urllib2.urlopen(req) 610 self.assertEqual(handler.headers_received['Range'], 'bytes=20-39') 611 finally: 612 self.server.stop() 613 614 def test_basic(self): 615 handler = self.start_server([(200, [], "we don't care")]) 616 617 try: 618 open_url = urllib2.urlopen("http://localhost:%s" % handler.port) 619 for attr in ("read", "close", "info", "geturl"): 620 self.assertTrue(hasattr(open_url, attr), "object returned from " 621 "urlopen lacks the %s attribute" % attr) 622 try: 623 self.assertTrue(open_url.read(), "calling 'read' failed") 624 finally: 625 open_url.close() 626 finally: 627 self.server.stop() 628 629 def test_info(self): 630 handler = self.start_server([(200, [], "we don't care")]) 631 632 try: 633 open_url = urllib2.urlopen("http://localhost:%s" % handler.port) 634 info_obj = open_url.info() 635 self.assertIsInstance(info_obj, mimetools.Message, 636 "object returned by 'info' is not an " 637 "instance of mimetools.Message") 638 self.assertEqual(info_obj.getsubtype(), "plain") 639 finally: 640 self.server.stop() 641 642 def test_geturl(self): 643 # Make sure same URL as opened is returned by geturl. 644 handler = self.start_server([(200, [], "we don't care")]) 645 646 try: 647 open_url = urllib2.urlopen("http://localhost:%s" % handler.port) 648 url = open_url.geturl() 649 self.assertEqual(url, "http://localhost:%s" % handler.port) 650 finally: 651 self.server.stop() 652 653 654 def test_bad_address(self): 655 # Make sure proper exception is raised when connecting to a bogus 656 # address. 657 658 # as indicated by the comment below, this might fail with some ISP, 659 # so we run the test only when -unetwork/-uall is specified to 660 # mitigate the problem a bit (see #17564) 661 test_support.requires('network') 662 self.assertRaises(IOError, 663 # Given that both VeriSign and various ISPs have in 664 # the past or are presently hijacking various invalid 665 # domain name requests in an attempt to boost traffic 666 # to their own sites, finding a domain name to use 667 # for this test is difficult. RFC2606 leads one to 668 # believe that '.invalid' should work, but experience 669 # seemed to indicate otherwise. Single character 670 # TLDs are likely to remain invalid, so this seems to 671 # be the best choice. The trailing '.' prevents a 672 # related problem: The normal DNS resolver appends 673 # the domain names from the search path if there is 674 # no '.' the end and, and if one of those domains 675 # implements a '*' rule a result is returned. 676 # However, none of this will prevent the test from 677 # failing if the ISP hijacks all invalid domain 678 # requests. The real solution would be to be able to 679 # parameterize the framework with a mock resolver. 680 urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./") 681 682 def test_iteration(self): 683 expected_response = "pycon 2008..." 684 handler = self.start_server([(200, [], expected_response)]) 685 try: 686 data = urllib2.urlopen("http://localhost:%s" % handler.port) 687 for line in data: 688 self.assertEqual(line, expected_response) 689 finally: 690 self.server.stop() 691 692 def ztest_line_iteration(self): 693 lines = ["We\n", "got\n", "here\n", "verylong " * 8192 + "\n"] 694 expected_response = "".join(lines) 695 handler = self.start_server([(200, [], expected_response)]) 696 try: 697 data = urllib2.urlopen("http://localhost:%s" % handler.port) 698 for index, line in enumerate(data): 699 self.assertEqual(line, lines[index], 700 "Fetched line number %s doesn't match expected:\n" 701 " Expected length was %s, got %s" % 702 (index, len(lines[index]), len(line))) 703 finally: 704 self.server.stop() 705 self.assertEqual(index + 1, len(lines)) 706 707def test_main(): 708 # We will NOT depend on the network resource flag 709 # (Lib/test/regrtest.py -u network) since all tests here are only 710 # localhost. However, if this is a bad rationale, then uncomment 711 # the next line. 712 #test_support.requires("network") 713 714 test_support.run_unittest(BasicAuthTests, ProxyAuthTests, TestUrlopen) 715 716if __name__ == "__main__": 717 test_main() 718