1import base64 2import os 3import email 4import urllib.parse 5import urllib.request 6import http.server 7import unittest 8import hashlib 9 10from test import support 11 12threading = support.import_module('threading') 13 14try: 15 import ssl 16except ImportError: 17 ssl = None 18 19here = os.path.dirname(__file__) 20# Self-signed cert file for 'localhost' 21CERT_localhost = os.path.join(here, 'keycert.pem') 22# Self-signed cert file for 'fakehostname' 23CERT_fakehostname = os.path.join(here, 'keycert2.pem') 24 25 26# Loopback http server infrastructure 27 28class LoopbackHttpServer(http.server.HTTPServer): 29 """HTTP server w/ a few modifications that make it useful for 30 loopback testing purposes. 31 """ 32 33 def __init__(self, server_address, RequestHandlerClass): 34 http.server.HTTPServer.__init__(self, 35 server_address, 36 RequestHandlerClass) 37 38 # Set the timeout of our listening socket really low so 39 # that we can stop the server easily. 40 self.socket.settimeout(0.1) 41 42 def get_request(self): 43 """HTTPServer method, overridden.""" 44 45 request, client_address = self.socket.accept() 46 47 # It's a loopback connection, so setting the timeout 48 # really low shouldn't affect anything, but should make 49 # deadlocks less likely to occur. 50 request.settimeout(10.0) 51 52 return (request, client_address) 53 54class LoopbackHttpServerThread(threading.Thread): 55 """Stoppable thread that runs a loopback http server.""" 56 57 def __init__(self, request_handler): 58 threading.Thread.__init__(self) 59 self._stop_server = False 60 self.ready = threading.Event() 61 request_handler.protocol_version = "HTTP/1.0" 62 self.httpd = LoopbackHttpServer(("127.0.0.1", 0), 63 request_handler) 64 self.port = self.httpd.server_port 65 66 def stop(self): 67 """Stops the webserver if it's currently running.""" 68 69 self._stop_server = True 70 71 self.join() 72 self.httpd.server_close() 73 74 def run(self): 75 self.ready.set() 76 while not self._stop_server: 77 self.httpd.handle_request() 78 79# Authentication infrastructure 80 81class DigestAuthHandler: 82 """Handler for performing digest authentication.""" 83 84 def __init__(self): 85 self._request_num = 0 86 self._nonces = [] 87 self._users = {} 88 self._realm_name = "Test Realm" 89 self._qop = "auth" 90 91 def set_qop(self, qop): 92 self._qop = qop 93 94 def set_users(self, users): 95 assert isinstance(users, dict) 96 self._users = users 97 98 def set_realm(self, realm): 99 self._realm_name = realm 100 101 def _generate_nonce(self): 102 self._request_num += 1 103 nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest() 104 self._nonces.append(nonce) 105 return nonce 106 107 def _create_auth_dict(self, auth_str): 108 first_space_index = auth_str.find(" ") 109 auth_str = auth_str[first_space_index+1:] 110 111 parts = auth_str.split(",") 112 113 auth_dict = {} 114 for part in parts: 115 name, value = part.split("=") 116 name = name.strip() 117 if value[0] == '"' and value[-1] == '"': 118 value = value[1:-1] 119 else: 120 value = value.strip() 121 auth_dict[name] = value 122 return auth_dict 123 124 def _validate_auth(self, auth_dict, password, method, uri): 125 final_dict = {} 126 final_dict.update(auth_dict) 127 final_dict["password"] = password 128 final_dict["method"] = method 129 final_dict["uri"] = uri 130 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict 131 HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest() 132 HA2_str = "%(method)s:%(uri)s" % final_dict 133 HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest() 134 final_dict["HA1"] = HA1 135 final_dict["HA2"] = HA2 136 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \ 137 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict 138 response = hashlib.md5(response_str.encode("ascii")).hexdigest() 139 140 return response == auth_dict["response"] 141 142 def _return_auth_challenge(self, request_handler): 143 request_handler.send_response(407, "Proxy Authentication Required") 144 request_handler.send_header("Content-Type", "text/html") 145 request_handler.send_header( 146 'Proxy-Authenticate', 'Digest realm="%s", ' 147 'qop="%s",' 148 'nonce="%s", ' % \ 149 (self._realm_name, self._qop, self._generate_nonce())) 150 # XXX: Not sure if we're supposed to add this next header or 151 # not. 152 #request_handler.send_header('Connection', 'close') 153 request_handler.end_headers() 154 request_handler.wfile.write(b"Proxy Authentication Required.") 155 return False 156 157 def handle_request(self, request_handler): 158 """Performs digest authentication on the given HTTP request 159 handler. Returns True if authentication was successful, False 160 otherwise. 161 162 If no users have been set, then digest auth is effectively 163 disabled and this method will always return True. 164 """ 165 166 if len(self._users) == 0: 167 return True 168 169 if "Proxy-Authorization" not in request_handler.headers: 170 return self._return_auth_challenge(request_handler) 171 else: 172 auth_dict = self._create_auth_dict( 173 request_handler.headers["Proxy-Authorization"] 174 ) 175 if auth_dict["username"] in self._users: 176 password = self._users[ auth_dict["username"] ] 177 else: 178 return self._return_auth_challenge(request_handler) 179 if not auth_dict.get("nonce") in self._nonces: 180 return self._return_auth_challenge(request_handler) 181 else: 182 self._nonces.remove(auth_dict["nonce"]) 183 184 auth_validated = False 185 186 # MSIE uses short_path in its validation, but Python's 187 # urllib.request uses the full path, so we're going to see if 188 # either of them works here. 189 190 for path in [request_handler.path, request_handler.short_path]: 191 if self._validate_auth(auth_dict, 192 password, 193 request_handler.command, 194 path): 195 auth_validated = True 196 197 if not auth_validated: 198 return self._return_auth_challenge(request_handler) 199 return True 200 201 202class BasicAuthHandler(http.server.BaseHTTPRequestHandler): 203 """Handler for performing basic authentication.""" 204 # Server side values 205 USER = 'testUser' 206 PASSWD = 'testPass' 207 REALM = 'Test' 208 USER_PASSWD = "%s:%s" % (USER, PASSWD) 209 ENCODED_AUTH = base64.b64encode(USER_PASSWD.encode('ascii')).decode('ascii') 210 211 def __init__(self, *args, **kwargs): 212 http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) 213 214 def log_message(self, format, *args): 215 # Suppress console log message 216 pass 217 218 def do_HEAD(self): 219 self.send_response(200) 220 self.send_header("Content-type", "text/html") 221 self.end_headers() 222 223 def do_AUTHHEAD(self): 224 self.send_response(401) 225 self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM) 226 self.send_header("Content-type", "text/html") 227 self.end_headers() 228 229 def do_GET(self): 230 if not self.headers.get("Authorization", ""): 231 self.do_AUTHHEAD() 232 self.wfile.write(b"No Auth header received") 233 elif self.headers.get( 234 "Authorization", "") == "Basic " + self.ENCODED_AUTH: 235 self.send_response(200) 236 self.end_headers() 237 self.wfile.write(b"It works") 238 else: 239 # Request Unauthorized 240 self.do_AUTHHEAD() 241 242 243 244# Proxy test infrastructure 245 246class FakeProxyHandler(http.server.BaseHTTPRequestHandler): 247 """This is a 'fake proxy' that makes it look like the entire 248 internet has gone down due to a sudden zombie invasion. It main 249 utility is in providing us with authentication support for 250 testing. 251 """ 252 253 def __init__(self, digest_auth_handler, *args, **kwargs): 254 # This has to be set before calling our parent's __init__(), which will 255 # try to call do_GET(). 256 self.digest_auth_handler = digest_auth_handler 257 http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) 258 259 def log_message(self, format, *args): 260 # Uncomment the next line for debugging. 261 # sys.stderr.write(format % args) 262 pass 263 264 def do_GET(self): 265 (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse( 266 self.path, "http") 267 self.short_path = path 268 if self.digest_auth_handler.handle_request(self): 269 self.send_response(200, "OK") 270 self.send_header("Content-Type", "text/html") 271 self.end_headers() 272 self.wfile.write(bytes("You've reached %s!<BR>" % self.path, 273 "ascii")) 274 self.wfile.write(b"Our apologies, but our server is down due to " 275 b"a sudden zombie invasion.") 276 277# Test cases 278 279@unittest.skipUnless(threading, "Threading required for this test.") 280class BasicAuthTests(unittest.TestCase): 281 USER = "testUser" 282 PASSWD = "testPass" 283 INCORRECT_PASSWD = "Incorrect" 284 REALM = "Test" 285 286 def setUp(self): 287 super(BasicAuthTests, self).setUp() 288 # With Basic Authentication 289 def http_server_with_basic_auth_handler(*args, **kwargs): 290 return BasicAuthHandler(*args, **kwargs) 291 self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler) 292 self.addCleanup(self.server.stop) 293 self.server_url = 'http://127.0.0.1:%s' % self.server.port 294 self.server.start() 295 self.server.ready.wait() 296 297 def tearDown(self): 298 super(BasicAuthTests, self).tearDown() 299 300 def test_basic_auth_success(self): 301 ah = urllib.request.HTTPBasicAuthHandler() 302 ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD) 303 urllib.request.install_opener(urllib.request.build_opener(ah)) 304 try: 305 self.assertTrue(urllib.request.urlopen(self.server_url)) 306 except urllib.error.HTTPError: 307 self.fail("Basic auth failed for the url: %s", self.server_url) 308 309 def test_basic_auth_httperror(self): 310 ah = urllib.request.HTTPBasicAuthHandler() 311 ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD) 312 urllib.request.install_opener(urllib.request.build_opener(ah)) 313 self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url) 314 315 316@unittest.skipUnless(threading, "Threading required for this test.") 317class ProxyAuthTests(unittest.TestCase): 318 URL = "http://localhost" 319 320 USER = "tester" 321 PASSWD = "test123" 322 REALM = "TestRealm" 323 324 def setUp(self): 325 super(ProxyAuthTests, self).setUp() 326 # Ignore proxy bypass settings in the environment. 327 def restore_environ(old_environ): 328 os.environ.clear() 329 os.environ.update(old_environ) 330 self.addCleanup(restore_environ, os.environ.copy()) 331 os.environ['NO_PROXY'] = '' 332 os.environ['no_proxy'] = '' 333 334 self.digest_auth_handler = DigestAuthHandler() 335 self.digest_auth_handler.set_users({self.USER: self.PASSWD}) 336 self.digest_auth_handler.set_realm(self.REALM) 337 # With Digest Authentication. 338 def create_fake_proxy_handler(*args, **kwargs): 339 return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs) 340 341 self.server = LoopbackHttpServerThread(create_fake_proxy_handler) 342 self.server.start() 343 self.server.ready.wait() 344 proxy_url = "http://127.0.0.1:%d" % self.server.port 345 handler = urllib.request.ProxyHandler({"http" : proxy_url}) 346 self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler() 347 self.opener = urllib.request.build_opener( 348 handler, self.proxy_digest_handler) 349 350 def tearDown(self): 351 self.server.stop() 352 super(ProxyAuthTests, self).tearDown() 353 354 def test_proxy_with_bad_password_raises_httperror(self): 355 self.proxy_digest_handler.add_password(self.REALM, self.URL, 356 self.USER, self.PASSWD+"bad") 357 self.digest_auth_handler.set_qop("auth") 358 self.assertRaises(urllib.error.HTTPError, 359 self.opener.open, 360 self.URL) 361 362 def test_proxy_with_no_password_raises_httperror(self): 363 self.digest_auth_handler.set_qop("auth") 364 self.assertRaises(urllib.error.HTTPError, 365 self.opener.open, 366 self.URL) 367 368 def test_proxy_qop_auth_works(self): 369 self.proxy_digest_handler.add_password(self.REALM, self.URL, 370 self.USER, self.PASSWD) 371 self.digest_auth_handler.set_qop("auth") 372 result = self.opener.open(self.URL) 373 while result.read(): 374 pass 375 result.close() 376 377 def test_proxy_qop_auth_int_works_or_throws_urlerror(self): 378 self.proxy_digest_handler.add_password(self.REALM, self.URL, 379 self.USER, self.PASSWD) 380 self.digest_auth_handler.set_qop("auth-int") 381 try: 382 result = self.opener.open(self.URL) 383 except urllib.error.URLError: 384 # It's okay if we don't support auth-int, but we certainly 385 # shouldn't receive any kind of exception here other than 386 # a URLError. 387 result = None 388 if result: 389 while result.read(): 390 pass 391 result.close() 392 393 394def GetRequestHandler(responses): 395 396 class FakeHTTPRequestHandler(http.server.BaseHTTPRequestHandler): 397 398 server_version = "TestHTTP/" 399 requests = [] 400 headers_received = [] 401 port = 80 402 403 def do_GET(self): 404 body = self.send_head() 405 while body: 406 done = self.wfile.write(body) 407 body = body[done:] 408 409 def do_POST(self): 410 content_length = self.headers["Content-Length"] 411 post_data = self.rfile.read(int(content_length)) 412 self.do_GET() 413 self.requests.append(post_data) 414 415 def send_head(self): 416 FakeHTTPRequestHandler.headers_received = self.headers 417 self.requests.append(self.path) 418 response_code, headers, body = responses.pop(0) 419 420 self.send_response(response_code) 421 422 for (header, value) in headers: 423 self.send_header(header, value % {'port':self.port}) 424 if body: 425 self.send_header("Content-type", "text/plain") 426 self.end_headers() 427 return body 428 self.end_headers() 429 430 def log_message(self, *args): 431 pass 432 433 434 return FakeHTTPRequestHandler 435 436 437@unittest.skipUnless(threading, "Threading required for this test.") 438class TestUrlopen(unittest.TestCase): 439 """Tests urllib.request.urlopen using the network. 440 441 These tests are not exhaustive. Assuming that testing using files does a 442 good job overall of some of the basic interface features. There are no 443 tests exercising the optional 'data' and 'proxies' arguments. No tests 444 for transparent redirection have been written. 445 """ 446 447 def setUp(self): 448 super(TestUrlopen, self).setUp() 449 450 # Ignore proxies for localhost tests. 451 def restore_environ(old_environ): 452 os.environ.clear() 453 os.environ.update(old_environ) 454 self.addCleanup(restore_environ, os.environ.copy()) 455 os.environ['NO_PROXY'] = '*' 456 os.environ['no_proxy'] = '*' 457 458 def urlopen(self, url, data=None, **kwargs): 459 l = [] 460 f = urllib.request.urlopen(url, data, **kwargs) 461 try: 462 # Exercise various methods 463 l.extend(f.readlines(200)) 464 l.append(f.readline()) 465 l.append(f.read(1024)) 466 l.append(f.read()) 467 finally: 468 f.close() 469 return b"".join(l) 470 471 def start_server(self, responses=None): 472 if responses is None: 473 responses = [(200, [], b"we don't care")] 474 handler = GetRequestHandler(responses) 475 476 self.server = LoopbackHttpServerThread(handler) 477 self.addCleanup(self.server.stop) 478 self.server.start() 479 self.server.ready.wait() 480 port = self.server.port 481 handler.port = port 482 return handler 483 484 def start_https_server(self, responses=None, **kwargs): 485 if not hasattr(urllib.request, 'HTTPSHandler'): 486 self.skipTest('ssl support required') 487 from test.ssl_servers import make_https_server 488 if responses is None: 489 responses = [(200, [], b"we care a bit")] 490 handler = GetRequestHandler(responses) 491 server = make_https_server(self, handler_class=handler, **kwargs) 492 handler.port = server.port 493 return handler 494 495 def test_redirection(self): 496 expected_response = b"We got here..." 497 responses = [ 498 (302, [("Location", "http://localhost:%(port)s/somewhere_else")], 499 ""), 500 (200, [], expected_response) 501 ] 502 503 handler = self.start_server(responses) 504 data = self.urlopen("http://localhost:%s/" % handler.port) 505 self.assertEqual(data, expected_response) 506 self.assertEqual(handler.requests, ["/", "/somewhere_else"]) 507 508 def test_chunked(self): 509 expected_response = b"hello world" 510 chunked_start = ( 511 b'a\r\n' 512 b'hello worl\r\n' 513 b'1\r\n' 514 b'd\r\n' 515 b'0\r\n' 516 ) 517 response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)] 518 handler = self.start_server(response) 519 data = self.urlopen("http://localhost:%s/" % handler.port) 520 self.assertEqual(data, expected_response) 521 522 def test_404(self): 523 expected_response = b"Bad bad bad..." 524 handler = self.start_server([(404, [], expected_response)]) 525 526 try: 527 self.urlopen("http://localhost:%s/weeble" % handler.port) 528 except urllib.error.URLError as f: 529 data = f.read() 530 f.close() 531 else: 532 self.fail("404 should raise URLError") 533 534 self.assertEqual(data, expected_response) 535 self.assertEqual(handler.requests, ["/weeble"]) 536 537 def test_200(self): 538 expected_response = b"pycon 2008..." 539 handler = self.start_server([(200, [], expected_response)]) 540 data = self.urlopen("http://localhost:%s/bizarre" % handler.port) 541 self.assertEqual(data, expected_response) 542 self.assertEqual(handler.requests, ["/bizarre"]) 543 544 def test_200_with_parameters(self): 545 expected_response = b"pycon 2008..." 546 handler = self.start_server([(200, [], expected_response)]) 547 data = self.urlopen("http://localhost:%s/bizarre" % handler.port, 548 b"get=with_feeling") 549 self.assertEqual(data, expected_response) 550 self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"]) 551 552 def test_https(self): 553 handler = self.start_https_server() 554 context = ssl.create_default_context(cafile=CERT_localhost) 555 data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context) 556 self.assertEqual(data, b"we care a bit") 557 558 def test_https_with_cafile(self): 559 handler = self.start_https_server(certfile=CERT_localhost) 560 with support.check_warnings(('', DeprecationWarning)): 561 # Good cert 562 data = self.urlopen("https://localhost:%s/bizarre" % handler.port, 563 cafile=CERT_localhost) 564 self.assertEqual(data, b"we care a bit") 565 # Bad cert 566 with self.assertRaises(urllib.error.URLError) as cm: 567 self.urlopen("https://localhost:%s/bizarre" % handler.port, 568 cafile=CERT_fakehostname) 569 # Good cert, but mismatching hostname 570 handler = self.start_https_server(certfile=CERT_fakehostname) 571 with self.assertRaises(ssl.CertificateError) as cm: 572 self.urlopen("https://localhost:%s/bizarre" % handler.port, 573 cafile=CERT_fakehostname) 574 575 def test_https_with_cadefault(self): 576 handler = self.start_https_server(certfile=CERT_localhost) 577 # Self-signed cert should fail verification with system certificate store 578 with support.check_warnings(('', DeprecationWarning)): 579 with self.assertRaises(urllib.error.URLError) as cm: 580 self.urlopen("https://localhost:%s/bizarre" % handler.port, 581 cadefault=True) 582 583 def test_https_sni(self): 584 if ssl is None: 585 self.skipTest("ssl module required") 586 if not ssl.HAS_SNI: 587 self.skipTest("SNI support required in OpenSSL") 588 sni_name = None 589 def cb_sni(ssl_sock, server_name, initial_context): 590 nonlocal sni_name 591 sni_name = server_name 592 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 593 context.set_servername_callback(cb_sni) 594 handler = self.start_https_server(context=context, certfile=CERT_localhost) 595 context = ssl.create_default_context(cafile=CERT_localhost) 596 self.urlopen("https://localhost:%s" % handler.port, context=context) 597 self.assertEqual(sni_name, "localhost") 598 599 def test_sending_headers(self): 600 handler = self.start_server() 601 req = urllib.request.Request("http://localhost:%s/" % handler.port, 602 headers={"Range": "bytes=20-39"}) 603 with urllib.request.urlopen(req): 604 pass 605 self.assertEqual(handler.headers_received["Range"], "bytes=20-39") 606 607 def test_basic(self): 608 handler = self.start_server() 609 open_url = urllib.request.urlopen("http://localhost:%s" % handler.port) 610 for attr in ("read", "close", "info", "geturl"): 611 self.assertTrue(hasattr(open_url, attr), "object returned from " 612 "urlopen lacks the %s attribute" % attr) 613 try: 614 self.assertTrue(open_url.read(), "calling 'read' failed") 615 finally: 616 open_url.close() 617 618 def test_info(self): 619 handler = self.start_server() 620 open_url = urllib.request.urlopen( 621 "http://localhost:%s" % handler.port) 622 with open_url: 623 info_obj = open_url.info() 624 self.assertIsInstance(info_obj, email.message.Message, 625 "object returned by 'info' is not an " 626 "instance of email.message.Message") 627 self.assertEqual(info_obj.get_content_subtype(), "plain") 628 629 def test_geturl(self): 630 # Make sure same URL as opened is returned by geturl. 631 handler = self.start_server() 632 open_url = urllib.request.urlopen("http://localhost:%s" % handler.port) 633 with open_url: 634 url = open_url.geturl() 635 self.assertEqual(url, "http://localhost:%s" % handler.port) 636 637 def test_iteration(self): 638 expected_response = b"pycon 2008..." 639 handler = self.start_server([(200, [], expected_response)]) 640 data = urllib.request.urlopen("http://localhost:%s" % handler.port) 641 for line in data: 642 self.assertEqual(line, expected_response) 643 644 def test_line_iteration(self): 645 lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"] 646 expected_response = b"".join(lines) 647 handler = self.start_server([(200, [], expected_response)]) 648 data = urllib.request.urlopen("http://localhost:%s" % handler.port) 649 for index, line in enumerate(data): 650 self.assertEqual(line, lines[index], 651 "Fetched line number %s doesn't match expected:\n" 652 " Expected length was %s, got %s" % 653 (index, len(lines[index]), len(line))) 654 self.assertEqual(index + 1, len(lines)) 655 656 657threads_key = None 658 659def setUpModule(): 660 # Store the threading_setup in a key and ensure that it is cleaned up 661 # in the tearDown 662 global threads_key 663 threads_key = support.threading_setup() 664 665def tearDownModule(): 666 if threads_key: 667 support.threading_cleanup(threads_key) 668 669if __name__ == "__main__": 670 unittest.main() 671