1import os 2import base64 3import urlparse 4import urllib2 5import BaseHTTPServer 6import unittest 7import hashlib 8 9from test import test_support 10 11mimetools = test_support.import_module('mimetools', deprecated=True) 12threading = test_support.import_module('threading') 13 14try: 15 import ssl 16except ImportError: 17 ssl = None 18 19here = os.path.dirname(__file__) 20# Self-signed cert file for 'localhost' 21CERT_localhost = os.path.join(here, 'keycert.pem') 22# Self-signed cert file for 'fakehostname' 23CERT_fakehostname = os.path.join(here, 'keycert2.pem') 24 25# Loopback http server infrastructure 26 27class LoopbackHttpServer(BaseHTTPServer.HTTPServer): 28 """HTTP server w/ a few modifications that make it useful for 29 loopback testing purposes. 30 """ 31 32 def __init__(self, server_address, RequestHandlerClass): 33 BaseHTTPServer.HTTPServer.__init__(self, 34 server_address, 35 RequestHandlerClass) 36 37 # Set the timeout of our listening socket really low so 38 # that we can stop the server easily. 39 self.socket.settimeout(0.1) 40 41 def get_request(self): 42 """BaseHTTPServer method, overridden.""" 43 44 request, client_address = self.socket.accept() 45 46 # It's a loopback connection, so setting the timeout 47 # really low shouldn't affect anything, but should make 48 # deadlocks less likely to occur. 49 request.settimeout(10.0) 50 51 return (request, client_address) 52 53class LoopbackHttpServerThread(threading.Thread): 54 """Stoppable thread that runs a loopback http server.""" 55 56 def __init__(self, request_handler): 57 threading.Thread.__init__(self) 58 self._stop = False 59 self.ready = threading.Event() 60 request_handler.protocol_version = "HTTP/1.0" 61 self.httpd = LoopbackHttpServer(('127.0.0.1', 0), 62 request_handler) 63 #print "Serving HTTP on %s port %s" % (self.httpd.server_name, 64 # self.httpd.server_port) 65 self.port = self.httpd.server_port 66 67 def stop(self): 68 """Stops the webserver if it's currently running.""" 69 70 # Set the stop flag. 71 self._stop = True 72 73 self.join() 74 75 def run(self): 76 self.ready.set() 77 while not self._stop: 78 self.httpd.handle_request() 79 80# Authentication infrastructure 81 82 83class BasicAuthHandler(BaseHTTPServer.BaseHTTPRequestHandler): 84 """Handler for performing Basic Authentication.""" 85 # Server side values 86 USER = "testUser" 87 PASSWD = "testPass" 88 REALM = "Test" 89 USER_PASSWD = "%s:%s" % (USER, PASSWD) 90 ENCODED_AUTH = base64.b64encode(USER_PASSWD) 91 92 def __init__(self, *args, **kwargs): 93 BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) 94 95 def log_message(self, format, *args): 96 # Suppress the HTTP Console log output 97 pass 98 99 def do_HEAD(self): 100 self.send_response(200) 101 self.send_header("Content-type", "text/html") 102 self.end_headers() 103 104 def do_AUTHHEAD(self): 105 self.send_response(401) 106 self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM) 107 self.send_header("Content-type", "text/html") 108 self.end_headers() 109 110 def do_GET(self): 111 if self.headers.getheader("Authorization") == None: 112 self.do_AUTHHEAD() 113 self.wfile.write("No Auth Header Received") 114 elif self.headers.getheader( 115 "Authorization") == "Basic " + self.ENCODED_AUTH: 116 self.wfile.write("It works!") 117 else: 118 # Unauthorized Request 119 self.do_AUTHHEAD() 120 121 122class DigestAuthHandler: 123 """Handler for performing digest authentication.""" 124 125 def __init__(self): 126 self._request_num = 0 127 self._nonces = [] 128 self._users = {} 129 self._realm_name = "Test Realm" 130 self._qop = "auth" 131 132 def set_qop(self, qop): 133 self._qop = qop 134 135 def set_users(self, users): 136 assert isinstance(users, dict) 137 self._users = users 138 139 def set_realm(self, realm): 140 self._realm_name = realm 141 142 def _generate_nonce(self): 143 self._request_num += 1 144 nonce = hashlib.md5(str(self._request_num)).hexdigest() 145 self._nonces.append(nonce) 146 return nonce 147 148 def _create_auth_dict(self, auth_str): 149 first_space_index = auth_str.find(" ") 150 auth_str = auth_str[first_space_index+1:] 151 152 parts = auth_str.split(",") 153 154 auth_dict = {} 155 for part in parts: 156 name, value = part.split("=") 157 name = name.strip() 158 if value[0] == '"' and value[-1] == '"': 159 value = value[1:-1] 160 else: 161 value = value.strip() 162 auth_dict[name] = value 163 return auth_dict 164 165 def _validate_auth(self, auth_dict, password, method, uri): 166 final_dict = {} 167 final_dict.update(auth_dict) 168 final_dict["password"] = password 169 final_dict["method"] = method 170 final_dict["uri"] = uri 171 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict 172 HA1 = hashlib.md5(HA1_str).hexdigest() 173 HA2_str = "%(method)s:%(uri)s" % final_dict 174 HA2 = hashlib.md5(HA2_str).hexdigest() 175 final_dict["HA1"] = HA1 176 final_dict["HA2"] = HA2 177 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \ 178 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict 179 response = hashlib.md5(response_str).hexdigest() 180 181 return response == auth_dict["response"] 182 183 def _return_auth_challenge(self, request_handler): 184 request_handler.send_response(407, "Proxy Authentication Required") 185 request_handler.send_header("Content-Type", "text/html") 186 request_handler.send_header( 187 'Proxy-Authenticate', 'Digest realm="%s", ' 188 'qop="%s",' 189 'nonce="%s", ' % \ 190 (self._realm_name, self._qop, self._generate_nonce())) 191 # XXX: Not sure if we're supposed to add this next header or 192 # not. 193 #request_handler.send_header('Connection', 'close') 194 request_handler.end_headers() 195 request_handler.wfile.write("Proxy Authentication Required.") 196 return False 197 198 def handle_request(self, request_handler): 199 """Performs digest authentication on the given HTTP request 200 handler. Returns True if authentication was successful, False 201 otherwise. 202 203 If no users have been set, then digest auth is effectively 204 disabled and this method will always return True. 205 """ 206 207 if len(self._users) == 0: 208 return True 209 210 if 'Proxy-Authorization' not in request_handler.headers: 211 return self._return_auth_challenge(request_handler) 212 else: 213 auth_dict = self._create_auth_dict( 214 request_handler.headers['Proxy-Authorization'] 215 ) 216 if auth_dict["username"] in self._users: 217 password = self._users[ auth_dict["username"] ] 218 else: 219 return self._return_auth_challenge(request_handler) 220 if not auth_dict.get("nonce") in self._nonces: 221 return self._return_auth_challenge(request_handler) 222 else: 223 self._nonces.remove(auth_dict["nonce"]) 224 225 auth_validated = False 226 227 # MSIE uses short_path in its validation, but Python's 228 # urllib2 uses the full path, so we're going to see if 229 # either of them works here. 230 231 for path in [request_handler.path, request_handler.short_path]: 232 if self._validate_auth(auth_dict, 233 password, 234 request_handler.command, 235 path): 236 auth_validated = True 237 238 if not auth_validated: 239 return self._return_auth_challenge(request_handler) 240 return True 241 242# Proxy test infrastructure 243 244class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): 245 """This is a 'fake proxy' that makes it look like the entire 246 internet has gone down due to a sudden zombie invasion. It main 247 utility is in providing us with authentication support for 248 testing. 249 """ 250 251 def __init__(self, digest_auth_handler, *args, **kwargs): 252 # This has to be set before calling our parent's __init__(), which will 253 # try to call do_GET(). 254 self.digest_auth_handler = digest_auth_handler 255 BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) 256 257 def log_message(self, format, *args): 258 # Uncomment the next line for debugging. 259 #sys.stderr.write(format % args) 260 pass 261 262 def do_GET(self): 263 (scm, netloc, path, params, query, fragment) = urlparse.urlparse( 264 self.path, 'http') 265 self.short_path = path 266 if self.digest_auth_handler.handle_request(self): 267 self.send_response(200, "OK") 268 self.send_header("Content-Type", "text/html") 269 self.end_headers() 270 self.wfile.write("You've reached %s!<BR>" % self.path) 271 self.wfile.write("Our apologies, but our server is down due to " 272 "a sudden zombie invasion.") 273 274# Test cases 275 276class BaseTestCase(unittest.TestCase): 277 def setUp(self): 278 self._threads = test_support.threading_setup() 279 280 def tearDown(self): 281 self.doCleanups() 282 test_support.threading_cleanup(*self._threads) 283 284 285class BasicAuthTests(BaseTestCase): 286 USER = "testUser" 287 PASSWD = "testPass" 288 INCORRECT_PASSWD = "Incorrect" 289 REALM = "Test" 290 291 def setUp(self): 292 super(BasicAuthTests, self).setUp() 293 # With Basic Authentication 294 def http_server_with_basic_auth_handler(*args, **kwargs): 295 return BasicAuthHandler(*args, **kwargs) 296 self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler) 297 self.server_url = 'http://127.0.0.1:%s' % self.server.port 298 self.server.start() 299 self.server.ready.wait() 300 self.addCleanup(self.server.stop) 301 302 def test_basic_auth_success(self): 303 ah = urllib2.HTTPBasicAuthHandler() 304 ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD) 305 urllib2.install_opener(urllib2.build_opener(ah)) 306 try: 307 self.assertTrue(urllib2.urlopen(self.server_url)) 308 except urllib2.HTTPError: 309 self.fail("Basic Auth Failed for url: %s" % self.server_url) 310 except Exception as e: 311 raise e 312 313 def test_basic_auth_httperror(self): 314 ah = urllib2.HTTPBasicAuthHandler() 315 ah.add_password(self.REALM, self.server_url, self.USER, 316 self.INCORRECT_PASSWD) 317 urllib2.install_opener(urllib2.build_opener(ah)) 318 self.assertRaises(urllib2.HTTPError, urllib2.urlopen, self.server_url) 319 320 321class ProxyAuthTests(BaseTestCase): 322 URL = "http://localhost" 323 324 USER = "tester" 325 PASSWD = "test123" 326 REALM = "TestRealm" 327 328 def setUp(self): 329 super(ProxyAuthTests, self).setUp() 330 # Ignore proxy bypass settings in the environment. 331 def restore_environ(old_environ): 332 os.environ.clear() 333 os.environ.update(old_environ) 334 self.addCleanup(restore_environ, os.environ.copy()) 335 os.environ['NO_PROXY'] = '' 336 os.environ['no_proxy'] = '' 337 338 self.digest_auth_handler = DigestAuthHandler() 339 self.digest_auth_handler.set_users({self.USER: self.PASSWD}) 340 self.digest_auth_handler.set_realm(self.REALM) 341 # With Digest Authentication 342 def create_fake_proxy_handler(*args, **kwargs): 343 return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs) 344 345 self.server = LoopbackHttpServerThread(create_fake_proxy_handler) 346 self.server.start() 347 self.server.ready.wait() 348 self.addCleanup(self.server.stop) 349 proxy_url = "http://127.0.0.1:%d" % self.server.port 350 handler = urllib2.ProxyHandler({"http" : proxy_url}) 351 self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler() 352 self.opener = urllib2.build_opener(handler, self.proxy_digest_handler) 353 354 def test_proxy_with_bad_password_raises_httperror(self): 355 self.proxy_digest_handler.add_password(self.REALM, self.URL, 356 self.USER, self.PASSWD+"bad") 357 self.digest_auth_handler.set_qop("auth") 358 self.assertRaises(urllib2.HTTPError, 359 self.opener.open, 360 self.URL) 361 362 def test_proxy_with_no_password_raises_httperror(self): 363 self.digest_auth_handler.set_qop("auth") 364 self.assertRaises(urllib2.HTTPError, 365 self.opener.open, 366 self.URL) 367 368 def test_proxy_qop_auth_works(self): 369 self.proxy_digest_handler.add_password(self.REALM, self.URL, 370 self.USER, self.PASSWD) 371 self.digest_auth_handler.set_qop("auth") 372 result = self.opener.open(self.URL) 373 while result.read(): 374 pass 375 result.close() 376 377 def test_proxy_qop_auth_int_works_or_throws_urlerror(self): 378 self.proxy_digest_handler.add_password(self.REALM, self.URL, 379 self.USER, self.PASSWD) 380 self.digest_auth_handler.set_qop("auth-int") 381 try: 382 result = self.opener.open(self.URL) 383 except urllib2.URLError: 384 # It's okay if we don't support auth-int, but we certainly 385 # shouldn't receive any kind of exception here other than 386 # a URLError. 387 result = None 388 if result: 389 while result.read(): 390 pass 391 result.close() 392 393 394def GetRequestHandler(responses): 395 396 class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): 397 398 server_version = "TestHTTP/" 399 requests = [] 400 headers_received = [] 401 port = 80 402 403 def do_GET(self): 404 body = self.send_head() 405 if body: 406 self.wfile.write(body) 407 408 def do_POST(self): 409 content_length = self.headers['Content-Length'] 410 post_data = self.rfile.read(int(content_length)) 411 self.do_GET() 412 self.requests.append(post_data) 413 414 def send_head(self): 415 FakeHTTPRequestHandler.headers_received = self.headers 416 self.requests.append(self.path) 417 response_code, headers, body = responses.pop(0) 418 419 self.send_response(response_code) 420 421 for (header, value) in headers: 422 self.send_header(header, value % self.port) 423 if body: 424 self.send_header('Content-type', 'text/plain') 425 self.end_headers() 426 return body 427 self.end_headers() 428 429 def log_message(self, *args): 430 pass 431 432 433 return FakeHTTPRequestHandler 434 435 436class TestUrlopen(BaseTestCase): 437 """Tests urllib2.urlopen using the network. 438 439 These tests are not exhaustive. Assuming that testing using files does a 440 good job overall of some of the basic interface features. There are no 441 tests exercising the optional 'data' and 'proxies' arguments. No tests 442 for transparent redirection have been written. 443 """ 444 445 def setUp(self): 446 proxy_handler = urllib2.ProxyHandler({}) 447 opener = urllib2.build_opener(proxy_handler) 448 urllib2.install_opener(opener) 449 super(TestUrlopen, self).setUp() 450 451 def urlopen(self, url, data=None, **kwargs): 452 l = [] 453 f = urllib2.urlopen(url, data, **kwargs) 454 try: 455 # Exercise various methods 456 l.extend(f.readlines(200)) 457 l.append(f.readline()) 458 l.append(f.read(1024)) 459 l.append(f.read()) 460 finally: 461 f.close() 462 return b"".join(l) 463 464 def start_server(self, responses): 465 handler = GetRequestHandler(responses) 466 467 self.server = LoopbackHttpServerThread(handler) 468 self.server.start() 469 self.server.ready.wait() 470 self.addCleanup(self.server.stop) 471 port = self.server.port 472 handler.port = port 473 return handler 474 475 def start_https_server(self, responses=None, **kwargs): 476 if not hasattr(urllib2, 'HTTPSHandler'): 477 self.skipTest('ssl support required') 478 from test.ssl_servers import make_https_server 479 if responses is None: 480 responses = [(200, [], b"we care a bit")] 481 handler = GetRequestHandler(responses) 482 server = make_https_server(self, handler_class=handler, **kwargs) 483 handler.port = server.port 484 return handler 485 486 def test_redirection(self): 487 expected_response = 'We got here...' 488 responses = [ 489 (302, [('Location', 'http://localhost:%s/somewhere_else')], ''), 490 (200, [], expected_response) 491 ] 492 493 handler = self.start_server(responses) 494 495 f = urllib2.urlopen('http://localhost:%s/' % handler.port) 496 data = f.read() 497 f.close() 498 499 self.assertEqual(data, expected_response) 500 self.assertEqual(handler.requests, ['/', '/somewhere_else']) 501 502 503 def test_404(self): 504 expected_response = 'Bad bad bad...' 505 handler = self.start_server([(404, [], expected_response)]) 506 507 try: 508 urllib2.urlopen('http://localhost:%s/weeble' % handler.port) 509 except urllib2.URLError, f: 510 pass 511 else: 512 self.fail('404 should raise URLError') 513 514 data = f.read() 515 f.close() 516 517 self.assertEqual(data, expected_response) 518 self.assertEqual(handler.requests, ['/weeble']) 519 520 521 def test_200(self): 522 expected_response = 'pycon 2008...' 523 handler = self.start_server([(200, [], expected_response)]) 524 525 f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port) 526 data = f.read() 527 f.close() 528 529 self.assertEqual(data, expected_response) 530 self.assertEqual(handler.requests, ['/bizarre']) 531 532 def test_200_with_parameters(self): 533 expected_response = 'pycon 2008...' 534 handler = self.start_server([(200, [], expected_response)]) 535 536 f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling') 537 data = f.read() 538 f.close() 539 540 self.assertEqual(data, expected_response) 541 self.assertEqual(handler.requests, ['/bizarre', 'get=with_feeling']) 542 543 def test_https(self): 544 handler = self.start_https_server() 545 context = ssl.create_default_context(cafile=CERT_localhost) 546 data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context) 547 self.assertEqual(data, b"we care a bit") 548 549 def test_https_with_cafile(self): 550 handler = self.start_https_server(certfile=CERT_localhost) 551 # Good cert 552 data = self.urlopen("https://localhost:%s/bizarre" % handler.port, 553 cafile=CERT_localhost) 554 self.assertEqual(data, b"we care a bit") 555 # Bad cert 556 with self.assertRaises(urllib2.URLError): 557 self.urlopen("https://localhost:%s/bizarre" % handler.port, 558 cafile=CERT_fakehostname) 559 # Good cert, but mismatching hostname 560 handler = self.start_https_server(certfile=CERT_fakehostname) 561 with self.assertRaises(ssl.CertificateError): 562 self.urlopen("https://localhost:%s/bizarre" % handler.port, 563 cafile=CERT_fakehostname) 564 565 def test_https_with_cadefault(self): 566 handler = self.start_https_server(certfile=CERT_localhost) 567 # Self-signed cert should fail verification with system certificate store 568 with self.assertRaises(urllib2.URLError): 569 self.urlopen("https://localhost:%s/bizarre" % handler.port, 570 cadefault=True) 571 572 def test_https_sni(self): 573 if ssl is None: 574 self.skipTest("ssl module required") 575 if not ssl.HAS_SNI: 576 self.skipTest("SNI support required in OpenSSL") 577 sni_name = [None] 578 def cb_sni(ssl_sock, server_name, initial_context): 579 sni_name[0] = server_name 580 context = ssl.SSLContext(ssl.PROTOCOL_TLS) 581 context.set_servername_callback(cb_sni) 582 handler = self.start_https_server(context=context, certfile=CERT_localhost) 583 context = ssl.create_default_context(cafile=CERT_localhost) 584 self.urlopen("https://localhost:%s" % handler.port, context=context) 585 self.assertEqual(sni_name[0], "localhost") 586 587 def test_sending_headers(self): 588 handler = self.start_server([(200, [], "we don't care")]) 589 590 req = urllib2.Request("http://localhost:%s/" % handler.port, 591 headers={'Range': 'bytes=20-39'}) 592 urllib2.urlopen(req) 593 self.assertEqual(handler.headers_received['Range'], 'bytes=20-39') 594 595 def test_basic(self): 596 handler = self.start_server([(200, [], "we don't care")]) 597 598 open_url = urllib2.urlopen("http://localhost:%s" % handler.port) 599 for attr in ("read", "close", "info", "geturl"): 600 self.assertTrue(hasattr(open_url, attr), "object returned from " 601 "urlopen lacks the %s attribute" % attr) 602 try: 603 self.assertTrue(open_url.read(), "calling 'read' failed") 604 finally: 605 open_url.close() 606 607 def test_info(self): 608 handler = self.start_server([(200, [], "we don't care")]) 609 610 open_url = urllib2.urlopen("http://localhost:%s" % handler.port) 611 info_obj = open_url.info() 612 self.assertIsInstance(info_obj, mimetools.Message, 613 "object returned by 'info' is not an " 614 "instance of mimetools.Message") 615 self.assertEqual(info_obj.getsubtype(), "plain") 616 617 def test_geturl(self): 618 # Make sure same URL as opened is returned by geturl. 619 handler = self.start_server([(200, [], "we don't care")]) 620 621 open_url = urllib2.urlopen("http://localhost:%s" % handler.port) 622 url = open_url.geturl() 623 self.assertEqual(url, "http://localhost:%s" % handler.port) 624 625 626 def test_bad_address(self): 627 # Make sure proper exception is raised when connecting to a bogus 628 # address. 629 630 # as indicated by the comment below, this might fail with some ISP, 631 # so we run the test only when -unetwork/-uall is specified to 632 # mitigate the problem a bit (see #17564) 633 test_support.requires('network') 634 self.assertRaises(IOError, 635 # Given that both VeriSign and various ISPs have in 636 # the past or are presently hijacking various invalid 637 # domain name requests in an attempt to boost traffic 638 # to their own sites, finding a domain name to use 639 # for this test is difficult. RFC2606 leads one to 640 # believe that '.invalid' should work, but experience 641 # seemed to indicate otherwise. Single character 642 # TLDs are likely to remain invalid, so this seems to 643 # be the best choice. The trailing '.' prevents a 644 # related problem: The normal DNS resolver appends 645 # the domain names from the search path if there is 646 # no '.' the end and, and if one of those domains 647 # implements a '*' rule a result is returned. 648 # However, none of this will prevent the test from 649 # failing if the ISP hijacks all invalid domain 650 # requests. The real solution would be to be able to 651 # parameterize the framework with a mock resolver. 652 urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./") 653 654 def test_iteration(self): 655 expected_response = "pycon 2008..." 656 handler = self.start_server([(200, [], expected_response)]) 657 658 data = urllib2.urlopen("http://localhost:%s" % handler.port) 659 for line in data: 660 self.assertEqual(line, expected_response) 661 662 def ztest_line_iteration(self): 663 lines = ["We\n", "got\n", "here\n", "verylong " * 8192 + "\n"] 664 expected_response = "".join(lines) 665 handler = self.start_server([(200, [], expected_response)]) 666 data = urllib2.urlopen("http://localhost:%s" % handler.port) 667 for index, line in enumerate(data): 668 self.assertEqual(line, lines[index], 669 "Fetched line number %s doesn't match expected:\n" 670 " Expected length was %s, got %s" % 671 (index, len(lines[index]), len(line))) 672 self.assertEqual(index + 1, len(lines)) 673 674def test_main(): 675 # We will NOT depend on the network resource flag 676 # (Lib/test/regrtest.py -u network) since all tests here are only 677 # localhost. However, if this is a bad rationale, then uncomment 678 # the next line. 679 #test_support.requires("network") 680 681 test_support.run_unittest(BasicAuthTests, ProxyAuthTests, TestUrlopen) 682 683if __name__ == "__main__": 684 test_main() 685