1#!/usr/bin/env python 2 3import urlparse 4import urllib2 5import BaseHTTPServer 6import unittest 7import hashlib 8from test import test_support 9mimetools = test_support.import_module('mimetools', deprecated=True) 10threading = test_support.import_module('threading') 11 12# Loopback http server infrastructure 13 14class LoopbackHttpServer(BaseHTTPServer.HTTPServer): 15 """HTTP server w/ a few modifications that make it useful for 16 loopback testing purposes. 17 """ 18 19 def __init__(self, server_address, RequestHandlerClass): 20 BaseHTTPServer.HTTPServer.__init__(self, 21 server_address, 22 RequestHandlerClass) 23 24 # Set the timeout of our listening socket really low so 25 # that we can stop the server easily. 26 self.socket.settimeout(1.0) 27 28 def get_request(self): 29 """BaseHTTPServer method, overridden.""" 30 31 request, client_address = self.socket.accept() 32 33 # It's a loopback connection, so setting the timeout 34 # really low shouldn't affect anything, but should make 35 # deadlocks less likely to occur. 36 request.settimeout(10.0) 37 38 return (request, client_address) 39 40class LoopbackHttpServerThread(threading.Thread): 41 """Stoppable thread that runs a loopback http server.""" 42 43 def __init__(self, request_handler): 44 threading.Thread.__init__(self) 45 self._stop = False 46 self.ready = threading.Event() 47 request_handler.protocol_version = "HTTP/1.0" 48 self.httpd = LoopbackHttpServer(('127.0.0.1', 0), 49 request_handler) 50 #print "Serving HTTP on %s port %s" % (self.httpd.server_name, 51 # self.httpd.server_port) 52 self.port = self.httpd.server_port 53 54 def stop(self): 55 """Stops the webserver if it's currently running.""" 56 57 # Set the stop flag. 58 self._stop = True 59 60 self.join() 61 62 def run(self): 63 self.ready.set() 64 while not self._stop: 65 self.httpd.handle_request() 66 67# Authentication infrastructure 68 69class DigestAuthHandler: 70 """Handler for performing digest authentication.""" 71 72 def __init__(self): 73 self._request_num = 0 74 self._nonces = [] 75 self._users = {} 76 self._realm_name = "Test Realm" 77 self._qop = "auth" 78 79 def set_qop(self, qop): 80 self._qop = qop 81 82 def set_users(self, users): 83 assert isinstance(users, dict) 84 self._users = users 85 86 def set_realm(self, realm): 87 self._realm_name = realm 88 89 def _generate_nonce(self): 90 self._request_num += 1 91 nonce = hashlib.md5(str(self._request_num)).hexdigest() 92 self._nonces.append(nonce) 93 return nonce 94 95 def _create_auth_dict(self, auth_str): 96 first_space_index = auth_str.find(" ") 97 auth_str = auth_str[first_space_index+1:] 98 99 parts = auth_str.split(",") 100 101 auth_dict = {} 102 for part in parts: 103 name, value = part.split("=") 104 name = name.strip() 105 if value[0] == '"' and value[-1] == '"': 106 value = value[1:-1] 107 else: 108 value = value.strip() 109 auth_dict[name] = value 110 return auth_dict 111 112 def _validate_auth(self, auth_dict, password, method, uri): 113 final_dict = {} 114 final_dict.update(auth_dict) 115 final_dict["password"] = password 116 final_dict["method"] = method 117 final_dict["uri"] = uri 118 HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict 119 HA1 = hashlib.md5(HA1_str).hexdigest() 120 HA2_str = "%(method)s:%(uri)s" % final_dict 121 HA2 = hashlib.md5(HA2_str).hexdigest() 122 final_dict["HA1"] = HA1 123 final_dict["HA2"] = HA2 124 response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \ 125 "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict 126 response = hashlib.md5(response_str).hexdigest() 127 128 return response == auth_dict["response"] 129 130 def _return_auth_challenge(self, request_handler): 131 request_handler.send_response(407, "Proxy Authentication Required") 132 request_handler.send_header("Content-Type", "text/html") 133 request_handler.send_header( 134 'Proxy-Authenticate', 'Digest realm="%s", ' 135 'qop="%s",' 136 'nonce="%s", ' % \ 137 (self._realm_name, self._qop, self._generate_nonce())) 138 # XXX: Not sure if we're supposed to add this next header or 139 # not. 140 #request_handler.send_header('Connection', 'close') 141 request_handler.end_headers() 142 request_handler.wfile.write("Proxy Authentication Required.") 143 return False 144 145 def handle_request(self, request_handler): 146 """Performs digest authentication on the given HTTP request 147 handler. Returns True if authentication was successful, False 148 otherwise. 149 150 If no users have been set, then digest auth is effectively 151 disabled and this method will always return True. 152 """ 153 154 if len(self._users) == 0: 155 return True 156 157 if 'Proxy-Authorization' not in request_handler.headers: 158 return self._return_auth_challenge(request_handler) 159 else: 160 auth_dict = self._create_auth_dict( 161 request_handler.headers['Proxy-Authorization'] 162 ) 163 if auth_dict["username"] in self._users: 164 password = self._users[ auth_dict["username"] ] 165 else: 166 return self._return_auth_challenge(request_handler) 167 if not auth_dict.get("nonce") in self._nonces: 168 return self._return_auth_challenge(request_handler) 169 else: 170 self._nonces.remove(auth_dict["nonce"]) 171 172 auth_validated = False 173 174 # MSIE uses short_path in its validation, but Python's 175 # urllib2 uses the full path, so we're going to see if 176 # either of them works here. 177 178 for path in [request_handler.path, request_handler.short_path]: 179 if self._validate_auth(auth_dict, 180 password, 181 request_handler.command, 182 path): 183 auth_validated = True 184 185 if not auth_validated: 186 return self._return_auth_challenge(request_handler) 187 return True 188 189# Proxy test infrastructure 190 191class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler): 192 """This is a 'fake proxy' that makes it look like the entire 193 internet has gone down due to a sudden zombie invasion. It main 194 utility is in providing us with authentication support for 195 testing. 196 """ 197 198 def __init__(self, digest_auth_handler, *args, **kwargs): 199 # This has to be set before calling our parent's __init__(), which will 200 # try to call do_GET(). 201 self.digest_auth_handler = digest_auth_handler 202 BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs) 203 204 def log_message(self, format, *args): 205 # Uncomment the next line for debugging. 206 #sys.stderr.write(format % args) 207 pass 208 209 def do_GET(self): 210 (scm, netloc, path, params, query, fragment) = urlparse.urlparse( 211 self.path, 'http') 212 self.short_path = path 213 if self.digest_auth_handler.handle_request(self): 214 self.send_response(200, "OK") 215 self.send_header("Content-Type", "text/html") 216 self.end_headers() 217 self.wfile.write("You've reached %s!<BR>" % self.path) 218 self.wfile.write("Our apologies, but our server is down due to " 219 "a sudden zombie invasion.") 220 221# Test cases 222 223class BaseTestCase(unittest.TestCase): 224 def setUp(self): 225 self._threads = test_support.threading_setup() 226 227 def tearDown(self): 228 test_support.threading_cleanup(*self._threads) 229 230 231class ProxyAuthTests(BaseTestCase): 232 URL = "http://localhost" 233 234 USER = "tester" 235 PASSWD = "test123" 236 REALM = "TestRealm" 237 238 def setUp(self): 239 super(ProxyAuthTests, self).setUp() 240 self.digest_auth_handler = DigestAuthHandler() 241 self.digest_auth_handler.set_users({self.USER: self.PASSWD}) 242 self.digest_auth_handler.set_realm(self.REALM) 243 def create_fake_proxy_handler(*args, **kwargs): 244 return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs) 245 246 self.server = LoopbackHttpServerThread(create_fake_proxy_handler) 247 self.server.start() 248 self.server.ready.wait() 249 proxy_url = "http://127.0.0.1:%d" % self.server.port 250 handler = urllib2.ProxyHandler({"http" : proxy_url}) 251 self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler() 252 self.opener = urllib2.build_opener(handler, self.proxy_digest_handler) 253 254 def tearDown(self): 255 self.server.stop() 256 super(ProxyAuthTests, self).tearDown() 257 258 def test_proxy_with_bad_password_raises_httperror(self): 259 self.proxy_digest_handler.add_password(self.REALM, self.URL, 260 self.USER, self.PASSWD+"bad") 261 self.digest_auth_handler.set_qop("auth") 262 self.assertRaises(urllib2.HTTPError, 263 self.opener.open, 264 self.URL) 265 266 def test_proxy_with_no_password_raises_httperror(self): 267 self.digest_auth_handler.set_qop("auth") 268 self.assertRaises(urllib2.HTTPError, 269 self.opener.open, 270 self.URL) 271 272 def test_proxy_qop_auth_works(self): 273 self.proxy_digest_handler.add_password(self.REALM, self.URL, 274 self.USER, self.PASSWD) 275 self.digest_auth_handler.set_qop("auth") 276 result = self.opener.open(self.URL) 277 while result.read(): 278 pass 279 result.close() 280 281 def test_proxy_qop_auth_int_works_or_throws_urlerror(self): 282 self.proxy_digest_handler.add_password(self.REALM, self.URL, 283 self.USER, self.PASSWD) 284 self.digest_auth_handler.set_qop("auth-int") 285 try: 286 result = self.opener.open(self.URL) 287 except urllib2.URLError: 288 # It's okay if we don't support auth-int, but we certainly 289 # shouldn't receive any kind of exception here other than 290 # a URLError. 291 result = None 292 if result: 293 while result.read(): 294 pass 295 result.close() 296 297 298def GetRequestHandler(responses): 299 300 class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): 301 302 server_version = "TestHTTP/" 303 requests = [] 304 headers_received = [] 305 port = 80 306 307 def do_GET(self): 308 body = self.send_head() 309 if body: 310 self.wfile.write(body) 311 312 def do_POST(self): 313 content_length = self.headers['Content-Length'] 314 post_data = self.rfile.read(int(content_length)) 315 self.do_GET() 316 self.requests.append(post_data) 317 318 def send_head(self): 319 FakeHTTPRequestHandler.headers_received = self.headers 320 self.requests.append(self.path) 321 response_code, headers, body = responses.pop(0) 322 323 self.send_response(response_code) 324 325 for (header, value) in headers: 326 self.send_header(header, value % self.port) 327 if body: 328 self.send_header('Content-type', 'text/plain') 329 self.end_headers() 330 return body 331 self.end_headers() 332 333 def log_message(self, *args): 334 pass 335 336 337 return FakeHTTPRequestHandler 338 339 340class TestUrlopen(BaseTestCase): 341 """Tests urllib2.urlopen using the network. 342 343 These tests are not exhaustive. Assuming that testing using files does a 344 good job overall of some of the basic interface features. There are no 345 tests exercising the optional 'data' and 'proxies' arguments. No tests 346 for transparent redirection have been written. 347 """ 348 349 def start_server(self, responses): 350 handler = GetRequestHandler(responses) 351 352 self.server = LoopbackHttpServerThread(handler) 353 self.server.start() 354 self.server.ready.wait() 355 port = self.server.port 356 handler.port = port 357 return handler 358 359 360 def test_redirection(self): 361 expected_response = 'We got here...' 362 responses = [ 363 (302, [('Location', 'http://localhost:%s/somewhere_else')], ''), 364 (200, [], expected_response) 365 ] 366 367 handler = self.start_server(responses) 368 369 try: 370 f = urllib2.urlopen('http://localhost:%s/' % handler.port) 371 data = f.read() 372 f.close() 373 374 self.assertEqual(data, expected_response) 375 self.assertEqual(handler.requests, ['/', '/somewhere_else']) 376 finally: 377 self.server.stop() 378 379 380 def test_404(self): 381 expected_response = 'Bad bad bad...' 382 handler = self.start_server([(404, [], expected_response)]) 383 384 try: 385 try: 386 urllib2.urlopen('http://localhost:%s/weeble' % handler.port) 387 except urllib2.URLError, f: 388 pass 389 else: 390 self.fail('404 should raise URLError') 391 392 data = f.read() 393 f.close() 394 395 self.assertEqual(data, expected_response) 396 self.assertEqual(handler.requests, ['/weeble']) 397 finally: 398 self.server.stop() 399 400 401 def test_200(self): 402 expected_response = 'pycon 2008...' 403 handler = self.start_server([(200, [], expected_response)]) 404 405 try: 406 f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port) 407 data = f.read() 408 f.close() 409 410 self.assertEqual(data, expected_response) 411 self.assertEqual(handler.requests, ['/bizarre']) 412 finally: 413 self.server.stop() 414 415 def test_200_with_parameters(self): 416 expected_response = 'pycon 2008...' 417 handler = self.start_server([(200, [], expected_response)]) 418 419 try: 420 f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling') 421 data = f.read() 422 f.close() 423 424 self.assertEqual(data, expected_response) 425 self.assertEqual(handler.requests, ['/bizarre', 'get=with_feeling']) 426 finally: 427 self.server.stop() 428 429 430 def test_sending_headers(self): 431 handler = self.start_server([(200, [], "we don't care")]) 432 433 try: 434 req = urllib2.Request("http://localhost:%s/" % handler.port, 435 headers={'Range': 'bytes=20-39'}) 436 urllib2.urlopen(req) 437 self.assertEqual(handler.headers_received['Range'], 'bytes=20-39') 438 finally: 439 self.server.stop() 440 441 def test_basic(self): 442 handler = self.start_server([(200, [], "we don't care")]) 443 444 try: 445 open_url = urllib2.urlopen("http://localhost:%s" % handler.port) 446 for attr in ("read", "close", "info", "geturl"): 447 self.assertTrue(hasattr(open_url, attr), "object returned from " 448 "urlopen lacks the %s attribute" % attr) 449 try: 450 self.assertTrue(open_url.read(), "calling 'read' failed") 451 finally: 452 open_url.close() 453 finally: 454 self.server.stop() 455 456 def test_info(self): 457 handler = self.start_server([(200, [], "we don't care")]) 458 459 try: 460 open_url = urllib2.urlopen("http://localhost:%s" % handler.port) 461 info_obj = open_url.info() 462 self.assertIsInstance(info_obj, mimetools.Message, 463 "object returned by 'info' is not an " 464 "instance of mimetools.Message") 465 self.assertEqual(info_obj.getsubtype(), "plain") 466 finally: 467 self.server.stop() 468 469 def test_geturl(self): 470 # Make sure same URL as opened is returned by geturl. 471 handler = self.start_server([(200, [], "we don't care")]) 472 473 try: 474 open_url = urllib2.urlopen("http://localhost:%s" % handler.port) 475 url = open_url.geturl() 476 self.assertEqual(url, "http://localhost:%s" % handler.port) 477 finally: 478 self.server.stop() 479 480 481 def test_bad_address(self): 482 # Make sure proper exception is raised when connecting to a bogus 483 # address. 484 self.assertRaises(IOError, 485 # Given that both VeriSign and various ISPs have in 486 # the past or are presently hijacking various invalid 487 # domain name requests in an attempt to boost traffic 488 # to their own sites, finding a domain name to use 489 # for this test is difficult. RFC2606 leads one to 490 # believe that '.invalid' should work, but experience 491 # seemed to indicate otherwise. Single character 492 # TLDs are likely to remain invalid, so this seems to 493 # be the best choice. The trailing '.' prevents a 494 # related problem: The normal DNS resolver appends 495 # the domain names from the search path if there is 496 # no '.' the end and, and if one of those domains 497 # implements a '*' rule a result is returned. 498 # However, none of this will prevent the test from 499 # failing if the ISP hijacks all invalid domain 500 # requests. The real solution would be to be able to 501 # parameterize the framework with a mock resolver. 502 urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./") 503 504 def test_iteration(self): 505 expected_response = "pycon 2008..." 506 handler = self.start_server([(200, [], expected_response)]) 507 try: 508 data = urllib2.urlopen("http://localhost:%s" % handler.port) 509 for line in data: 510 self.assertEqual(line, expected_response) 511 finally: 512 self.server.stop() 513 514 def ztest_line_iteration(self): 515 lines = ["We\n", "got\n", "here\n", "verylong " * 8192 + "\n"] 516 expected_response = "".join(lines) 517 handler = self.start_server([(200, [], expected_response)]) 518 try: 519 data = urllib2.urlopen("http://localhost:%s" % handler.port) 520 for index, line in enumerate(data): 521 self.assertEqual(line, lines[index], 522 "Fetched line number %s doesn't match expected:\n" 523 " Expected length was %s, got %s" % 524 (index, len(lines[index]), len(line))) 525 finally: 526 self.server.stop() 527 self.assertEqual(index + 1, len(lines)) 528 529def test_main(): 530 # We will NOT depend on the network resource flag 531 # (Lib/test/regrtest.py -u network) since all tests here are only 532 # localhost. However, if this is a bad rationale, then uncomment 533 # the next line. 534 #test_support.requires("network") 535 536 test_support.run_unittest(ProxyAuthTests, TestUrlopen) 537 538if __name__ == "__main__": 539 test_main() 540