1import base64 2import os 3import email 4import urllib.parse 5import urllib.request 6import http.server 7import threading 8import unittest 9import hashlib 10 11from test.support import hashlib_helper 12from test.support import threading_helper 13from test.support import warnings_helper 14 15try: 16 import ssl 17except ImportError: 18 ssl = None 19 20here = os.path.dirname(__file__) 21# Self-signed cert file for 'localhost' 22CERT_localhost = os.path.join(here, 'keycert.pem') 23# Self-signed cert file for 'fakehostname' 24CERT_fakehostname = os.path.join(here, 'keycert2.pem') 25 26 27# Loopback http server infrastructure 28 29class LoopbackHttpServer(http.server.HTTPServer): 30 """HTTP server w/ a few modifications that make it useful for 31 loopback testing purposes. 32 """ 33 34 def __init__(self, server_address, RequestHandlerClass): 35 http.server.HTTPServer.__init__(self, 36 server_address, 37 RequestHandlerClass) 38 39 # Set the timeout of our listening socket really low so 40 # that we can stop the server easily. 41 self.socket.settimeout(0.1) 42 43 def get_request(self): 44 """HTTPServer method, overridden.""" 45 46 request, client_address = self.socket.accept() 47 48 # It's a loopback connection, so setting the timeout 49 # really low shouldn't affect anything, but should make 50 # deadlocks less likely to occur. 51 request.settimeout(10.0) 52 53 return (request, client_address) 54 55class LoopbackHttpServerThread(threading.Thread): 56 """Stoppable thread that runs a loopback http server.""" 57 58 def __init__(self, request_handler): 59 threading.Thread.__init__(self) 60 self._stop_server = False 61 self.ready = threading.Event() 62 request_handler.protocol_version = "HTTP/1.0" 63 self.httpd = LoopbackHttpServer(("127.0.0.1", 0), 64 request_handler) 65 self.port = self.httpd.server_port 66 67 def stop(self): 68 """Stops the webserver if it's currently running.""" 69 70 self._stop_server = True 71 72 self.join() 73 self.httpd.server_close() 74 75 def run(self): 76 self.ready.set() 77 while not self._stop_server: 78 self.httpd.handle_request() 79 80# Authentication infrastructure 81 82class DigestAuthHandler: 83 """Handler for performing digest authentication.""" 84 85 def __init__(self): 86 self._request_num = 0 87 self._nonces = [] 88 self._users = {} 89 self._realm_name = "Test Realm" 90 self._qop = "auth" 91 92 def set_qop(self, qop): 93 self._qop = qop 94 95 def set_users(self, users): 96 assert isinstance(users, dict) 97 self._users = users 98 99 def set_realm(self, realm): 100 self._realm_name = realm 101 102 def _generate_nonce(self): 103 self._request_num += 1 104 nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest() 105 self._nonces.append(nonce) 106 return nonce 107 108 def _create_auth_dict(self, auth_str): 109 first_space_index = auth_str.find(" ") 110 auth_str = auth_str[first_space_index+1:] 111 112 parts = auth_str.split(",") 113 114 auth_dict = {} 115 for part in parts: 116 name, value = part.split("=") 117 name = name.strip() 118 if value[0] == '"' and value[-1] == '"': 119 value = value[1:-1] 120 else: 121 value = value.strip() 122 auth_dict[name] = value 123 return auth_dict 124 125 def _validate_auth(self, auth_dict, password, method, uri): 126 final_dict = {} 127 final_dict.update(auth_dict) 128 final_dict["password"] = password 129 final_dict["method"] = method 130 final_dict["uri"] = uri 131 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict 132 HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest() 133 HA2_str = "%(method)s:%(uri)s" % final_dict 134 HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest() 135 final_dict["HA1"] = HA1 136 final_dict["HA2"] = HA2 137 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \ 138 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict 139 response = hashlib.md5(response_str.encode("ascii")).hexdigest() 140 141 return response == auth_dict["response"] 142 143 def _return_auth_challenge(self, request_handler): 144 request_handler.send_response(407, "Proxy Authentication Required") 145 request_handler.send_header("Content-Type", "text/html") 146 request_handler.send_header( 147 'Proxy-Authenticate', 'Digest realm="%s", ' 148 'qop="%s",' 149 'nonce="%s", ' % \ 150 (self._realm_name, self._qop, self._generate_nonce())) 151 # XXX: Not sure if we're supposed to add this next header or 152 # not. 153 #request_handler.send_header('Connection', 'close') 154 request_handler.end_headers() 155 request_handler.wfile.write(b"Proxy Authentication Required.") 156 return False 157 158 def handle_request(self, request_handler): 159 """Performs digest authentication on the given HTTP request 160 handler. Returns True if authentication was successful, False 161 otherwise. 162 163 If no users have been set, then digest auth is effectively 164 disabled and this method will always return True. 165 """ 166 167 if len(self._users) == 0: 168 return True 169 170 if "Proxy-Authorization" not in request_handler.headers: 171 return self._return_auth_challenge(request_handler) 172 else: 173 auth_dict = self._create_auth_dict( 174 request_handler.headers["Proxy-Authorization"] 175 ) 176 if auth_dict["username"] in self._users: 177 password = self._users[ auth_dict["username"] ] 178 else: 179 return self._return_auth_challenge(request_handler) 180 if not auth_dict.get("nonce") in self._nonces: 181 return self._return_auth_challenge(request_handler) 182 else: 183 self._nonces.remove(auth_dict["nonce"]) 184 185 auth_validated = False 186 187 # MSIE uses short_path in its validation, but Python's 188 # urllib.request uses the full path, so we're going to see if 189 # either of them works here. 190 191 for path in [request_handler.path, request_handler.short_path]: 192 if self._validate_auth(auth_dict, 193 password, 194 request_handler.command, 195 path): 196 auth_validated = True 197 198 if not auth_validated: 199 return self._return_auth_challenge(request_handler) 200 return True 201 202 203class BasicAuthHandler(http.server.BaseHTTPRequestHandler): 204 """Handler for performing basic authentication.""" 205 # Server side values 206 USER = 'testUser' 207 PASSWD = 'testPass' 208 REALM = 'Test' 209 USER_PASSWD = "%s:%s" % (USER, PASSWD) 210 ENCODED_AUTH = base64.b64encode(USER_PASSWD.encode('ascii')).decode('ascii') 211 212 def __init__(self, *args, **kwargs): 213 http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) 214 215 def log_message(self, format, *args): 216 # Suppress console log message 217 pass 218 219 def do_HEAD(self): 220 self.send_response(200) 221 self.send_header("Content-type", "text/html") 222 self.end_headers() 223 224 def do_AUTHHEAD(self): 225 self.send_response(401) 226 self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM) 227 self.send_header("Content-type", "text/html") 228 self.end_headers() 229 230 def do_GET(self): 231 if not self.headers.get("Authorization", ""): 232 self.do_AUTHHEAD() 233 self.wfile.write(b"No Auth header received") 234 elif self.headers.get( 235 "Authorization", "") == "Basic " + self.ENCODED_AUTH: 236 self.send_response(200) 237 self.end_headers() 238 self.wfile.write(b"It works") 239 else: 240 # Request Unauthorized 241 self.do_AUTHHEAD() 242 243 244 245# Proxy test infrastructure 246 247class FakeProxyHandler(http.server.BaseHTTPRequestHandler): 248 """This is a 'fake proxy' that makes it look like the entire 249 internet has gone down due to a sudden zombie invasion. It main 250 utility is in providing us with authentication support for 251 testing. 252 """ 253 254 def __init__(self, digest_auth_handler, *args, **kwargs): 255 # This has to be set before calling our parent's __init__(), which will 256 # try to call do_GET(). 257 self.digest_auth_handler = digest_auth_handler 258 http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) 259 260 def log_message(self, format, *args): 261 # Uncomment the next line for debugging. 262 # sys.stderr.write(format % args) 263 pass 264 265 def do_GET(self): 266 (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse( 267 self.path, "http") 268 self.short_path = path 269 if self.digest_auth_handler.handle_request(self): 270 self.send_response(200, "OK") 271 self.send_header("Content-Type", "text/html") 272 self.end_headers() 273 self.wfile.write(bytes("You've reached %s!<BR>" % self.path, 274 "ascii")) 275 self.wfile.write(b"Our apologies, but our server is down due to " 276 b"a sudden zombie invasion.") 277 278# Test cases 279 280class BasicAuthTests(unittest.TestCase): 281 USER = "testUser" 282 PASSWD = "testPass" 283 INCORRECT_PASSWD = "Incorrect" 284 REALM = "Test" 285 286 def setUp(self): 287 super(BasicAuthTests, self).setUp() 288 # With Basic Authentication 289 def http_server_with_basic_auth_handler(*args, **kwargs): 290 return BasicAuthHandler(*args, **kwargs) 291 self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler) 292 self.addCleanup(self.stop_server) 293 self.server_url = 'http://127.0.0.1:%s' % self.server.port 294 self.server.start() 295 self.server.ready.wait() 296 297 def stop_server(self): 298 self.server.stop() 299 self.server = None 300 301 def tearDown(self): 302 super(BasicAuthTests, self).tearDown() 303 304 def test_basic_auth_success(self): 305 ah = urllib.request.HTTPBasicAuthHandler() 306 ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD) 307 urllib.request.install_opener(urllib.request.build_opener(ah)) 308 try: 309 self.assertTrue(urllib.request.urlopen(self.server_url)) 310 except urllib.error.HTTPError: 311 self.fail("Basic auth failed for the url: %s" % self.server_url) 312 313 def test_basic_auth_httperror(self): 314 ah = urllib.request.HTTPBasicAuthHandler() 315 ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD) 316 urllib.request.install_opener(urllib.request.build_opener(ah)) 317 self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url) 318 319 320@hashlib_helper.requires_hashdigest("md5", openssl=True) 321class ProxyAuthTests(unittest.TestCase): 322 URL = "http://localhost" 323 324 USER = "tester" 325 PASSWD = "test123" 326 REALM = "TestRealm" 327 328 def setUp(self): 329 super(ProxyAuthTests, self).setUp() 330 # Ignore proxy bypass settings in the environment. 331 def restore_environ(old_environ): 332 os.environ.clear() 333 os.environ.update(old_environ) 334 self.addCleanup(restore_environ, os.environ.copy()) 335 os.environ['NO_PROXY'] = '' 336 os.environ['no_proxy'] = '' 337 338 self.digest_auth_handler = DigestAuthHandler() 339 self.digest_auth_handler.set_users({self.USER: self.PASSWD}) 340 self.digest_auth_handler.set_realm(self.REALM) 341 # With Digest Authentication. 342 def create_fake_proxy_handler(*args, **kwargs): 343 return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs) 344 345 self.server = LoopbackHttpServerThread(create_fake_proxy_handler) 346 self.addCleanup(self.stop_server) 347 self.server.start() 348 self.server.ready.wait() 349 proxy_url = "http://127.0.0.1:%d" % self.server.port 350 handler = urllib.request.ProxyHandler({"http" : proxy_url}) 351 self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler() 352 self.opener = urllib.request.build_opener( 353 handler, self.proxy_digest_handler) 354 355 def stop_server(self): 356 self.server.stop() 357 self.server = None 358 359 def test_proxy_with_bad_password_raises_httperror(self): 360 self.proxy_digest_handler.add_password(self.REALM, self.URL, 361 self.USER, self.PASSWD+"bad") 362 self.digest_auth_handler.set_qop("auth") 363 self.assertRaises(urllib.error.HTTPError, 364 self.opener.open, 365 self.URL) 366 367 def test_proxy_with_no_password_raises_httperror(self): 368 self.digest_auth_handler.set_qop("auth") 369 self.assertRaises(urllib.error.HTTPError, 370 self.opener.open, 371 self.URL) 372 373 def test_proxy_qop_auth_works(self): 374 self.proxy_digest_handler.add_password(self.REALM, self.URL, 375 self.USER, self.PASSWD) 376 self.digest_auth_handler.set_qop("auth") 377 with self.opener.open(self.URL) as result: 378 while result.read(): 379 pass 380 381 def test_proxy_qop_auth_int_works_or_throws_urlerror(self): 382 self.proxy_digest_handler.add_password(self.REALM, self.URL, 383 self.USER, self.PASSWD) 384 self.digest_auth_handler.set_qop("auth-int") 385 try: 386 result = self.opener.open(self.URL) 387 except urllib.error.URLError: 388 # It's okay if we don't support auth-int, but we certainly 389 # shouldn't receive any kind of exception here other than 390 # a URLError. 391 pass 392 else: 393 with result: 394 while result.read(): 395 pass 396 397 398def GetRequestHandler(responses): 399 400 class FakeHTTPRequestHandler(http.server.BaseHTTPRequestHandler): 401 402 server_version = "TestHTTP/" 403 requests = [] 404 headers_received = [] 405 port = 80 406 407 def do_GET(self): 408 body = self.send_head() 409 while body: 410 done = self.wfile.write(body) 411 body = body[done:] 412 413 def do_POST(self): 414 content_length = self.headers["Content-Length"] 415 post_data = self.rfile.read(int(content_length)) 416 self.do_GET() 417 self.requests.append(post_data) 418 419 def send_head(self): 420 FakeHTTPRequestHandler.headers_received = self.headers 421 self.requests.append(self.path) 422 response_code, headers, body = responses.pop(0) 423 424 self.send_response(response_code) 425 426 for (header, value) in headers: 427 self.send_header(header, value % {'port':self.port}) 428 if body: 429 self.send_header("Content-type", "text/plain") 430 self.end_headers() 431 return body 432 self.end_headers() 433 434 def log_message(self, *args): 435 pass 436 437 438 return FakeHTTPRequestHandler 439 440 441class TestUrlopen(unittest.TestCase): 442 """Tests urllib.request.urlopen using the network. 443 444 These tests are not exhaustive. Assuming that testing using files does a 445 good job overall of some of the basic interface features. There are no 446 tests exercising the optional 'data' and 'proxies' arguments. No tests 447 for transparent redirection have been written. 448 """ 449 450 def setUp(self): 451 super(TestUrlopen, self).setUp() 452 453 # clear _opener global variable 454 self.addCleanup(urllib.request.urlcleanup) 455 456 # Ignore proxies for localhost tests. 457 def restore_environ(old_environ): 458 os.environ.clear() 459 os.environ.update(old_environ) 460 self.addCleanup(restore_environ, os.environ.copy()) 461 os.environ['NO_PROXY'] = '*' 462 os.environ['no_proxy'] = '*' 463 464 def urlopen(self, url, data=None, **kwargs): 465 l = [] 466 f = urllib.request.urlopen(url, data, **kwargs) 467 try: 468 # Exercise various methods 469 l.extend(f.readlines(200)) 470 l.append(f.readline()) 471 l.append(f.read(1024)) 472 l.append(f.read()) 473 finally: 474 f.close() 475 return b"".join(l) 476 477 def stop_server(self): 478 self.server.stop() 479 self.server = None 480 481 def start_server(self, responses=None): 482 if responses is None: 483 responses = [(200, [], b"we don't care")] 484 handler = GetRequestHandler(responses) 485 486 self.server = LoopbackHttpServerThread(handler) 487 self.addCleanup(self.stop_server) 488 self.server.start() 489 self.server.ready.wait() 490 port = self.server.port 491 handler.port = port 492 return handler 493 494 def start_https_server(self, responses=None, **kwargs): 495 if not hasattr(urllib.request, 'HTTPSHandler'): 496 self.skipTest('ssl support required') 497 from test.ssl_servers import make_https_server 498 if responses is None: 499 responses = [(200, [], b"we care a bit")] 500 handler = GetRequestHandler(responses) 501 server = make_https_server(self, handler_class=handler, **kwargs) 502 handler.port = server.port 503 return handler 504 505 def test_redirection(self): 506 expected_response = b"We got here..." 507 responses = [ 508 (302, [("Location", "http://localhost:%(port)s/somewhere_else")], 509 ""), 510 (200, [], expected_response) 511 ] 512 513 handler = self.start_server(responses) 514 data = self.urlopen("http://localhost:%s/" % handler.port) 515 self.assertEqual(data, expected_response) 516 self.assertEqual(handler.requests, ["/", "/somewhere_else"]) 517 518 def test_chunked(self): 519 expected_response = b"hello world" 520 chunked_start = ( 521 b'a\r\n' 522 b'hello worl\r\n' 523 b'1\r\n' 524 b'd\r\n' 525 b'0\r\n' 526 ) 527 response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)] 528 handler = self.start_server(response) 529 data = self.urlopen("http://localhost:%s/" % handler.port) 530 self.assertEqual(data, expected_response) 531 532 def test_404(self): 533 expected_response = b"Bad bad bad..." 534 handler = self.start_server([(404, [], expected_response)]) 535 536 try: 537 self.urlopen("http://localhost:%s/weeble" % handler.port) 538 except urllib.error.URLError as f: 539 data = f.read() 540 f.close() 541 else: 542 self.fail("404 should raise URLError") 543 544 self.assertEqual(data, expected_response) 545 self.assertEqual(handler.requests, ["/weeble"]) 546 547 def test_200(self): 548 expected_response = b"pycon 2008..." 549 handler = self.start_server([(200, [], expected_response)]) 550 data = self.urlopen("http://localhost:%s/bizarre" % handler.port) 551 self.assertEqual(data, expected_response) 552 self.assertEqual(handler.requests, ["/bizarre"]) 553 554 def test_200_with_parameters(self): 555 expected_response = b"pycon 2008..." 556 handler = self.start_server([(200, [], expected_response)]) 557 data = self.urlopen("http://localhost:%s/bizarre" % handler.port, 558 b"get=with_feeling") 559 self.assertEqual(data, expected_response) 560 self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"]) 561 562 def test_https(self): 563 handler = self.start_https_server() 564 context = ssl.create_default_context(cafile=CERT_localhost) 565 data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context) 566 self.assertEqual(data, b"we care a bit") 567 568 def test_https_with_cafile(self): 569 handler = self.start_https_server(certfile=CERT_localhost) 570 with warnings_helper.check_warnings(('', DeprecationWarning)): 571 # Good cert 572 data = self.urlopen("https://localhost:%s/bizarre" % handler.port, 573 cafile=CERT_localhost) 574 self.assertEqual(data, b"we care a bit") 575 # Bad cert 576 with self.assertRaises(urllib.error.URLError) as cm: 577 self.urlopen("https://localhost:%s/bizarre" % handler.port, 578 cafile=CERT_fakehostname) 579 # Good cert, but mismatching hostname 580 handler = self.start_https_server(certfile=CERT_fakehostname) 581 with self.assertRaises(urllib.error.URLError) as cm: 582 self.urlopen("https://localhost:%s/bizarre" % handler.port, 583 cafile=CERT_fakehostname) 584 585 def test_https_with_cadefault(self): 586 handler = self.start_https_server(certfile=CERT_localhost) 587 # Self-signed cert should fail verification with system certificate store 588 with warnings_helper.check_warnings(('', DeprecationWarning)): 589 with self.assertRaises(urllib.error.URLError) as cm: 590 self.urlopen("https://localhost:%s/bizarre" % handler.port, 591 cadefault=True) 592 593 def test_https_sni(self): 594 if ssl is None: 595 self.skipTest("ssl module required") 596 if not ssl.HAS_SNI: 597 self.skipTest("SNI support required in OpenSSL") 598 sni_name = None 599 def cb_sni(ssl_sock, server_name, initial_context): 600 nonlocal sni_name 601 sni_name = server_name 602 context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 603 context.set_servername_callback(cb_sni) 604 handler = self.start_https_server(context=context, certfile=CERT_localhost) 605 context = ssl.create_default_context(cafile=CERT_localhost) 606 self.urlopen("https://localhost:%s" % handler.port, context=context) 607 self.assertEqual(sni_name, "localhost") 608 609 def test_sending_headers(self): 610 handler = self.start_server() 611 req = urllib.request.Request("http://localhost:%s/" % handler.port, 612 headers={"Range": "bytes=20-39"}) 613 with urllib.request.urlopen(req): 614 pass 615 self.assertEqual(handler.headers_received["Range"], "bytes=20-39") 616 617 def test_basic(self): 618 handler = self.start_server() 619 with urllib.request.urlopen("http://localhost:%s" % handler.port) as open_url: 620 for attr in ("read", "close", "info", "geturl"): 621 self.assertTrue(hasattr(open_url, attr), "object returned from " 622 "urlopen lacks the %s attribute" % attr) 623 self.assertTrue(open_url.read(), "calling 'read' failed") 624 625 def test_info(self): 626 handler = self.start_server() 627 open_url = urllib.request.urlopen( 628 "http://localhost:%s" % handler.port) 629 with open_url: 630 info_obj = open_url.info() 631 self.assertIsInstance(info_obj, email.message.Message, 632 "object returned by 'info' is not an " 633 "instance of email.message.Message") 634 self.assertEqual(info_obj.get_content_subtype(), "plain") 635 636 def test_geturl(self): 637 # Make sure same URL as opened is returned by geturl. 638 handler = self.start_server() 639 open_url = urllib.request.urlopen("http://localhost:%s" % handler.port) 640 with open_url: 641 url = open_url.geturl() 642 self.assertEqual(url, "http://localhost:%s" % handler.port) 643 644 def test_iteration(self): 645 expected_response = b"pycon 2008..." 646 handler = self.start_server([(200, [], expected_response)]) 647 data = urllib.request.urlopen("http://localhost:%s" % handler.port) 648 for line in data: 649 self.assertEqual(line, expected_response) 650 651 def test_line_iteration(self): 652 lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"] 653 expected_response = b"".join(lines) 654 handler = self.start_server([(200, [], expected_response)]) 655 data = urllib.request.urlopen("http://localhost:%s" % handler.port) 656 for index, line in enumerate(data): 657 self.assertEqual(line, lines[index], 658 "Fetched line number %s doesn't match expected:\n" 659 " Expected length was %s, got %s" % 660 (index, len(lines[index]), len(line))) 661 self.assertEqual(index + 1, len(lines)) 662 663 664def setUpModule(): 665 thread_info = threading_helper.threading_setup() 666 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) 667 668 669if __name__ == "__main__": 670 unittest.main() 671