1import unittest 2from test import test_support 3from test import test_urllib 4 5import os 6import socket 7import StringIO 8 9import urllib2 10from urllib2 import Request, OpenerDirector, AbstractDigestAuthHandler 11import httplib 12 13try: 14 import ssl 15except ImportError: 16 ssl = None 17 18# XXX 19# Request 20# CacheFTPHandler (hard to write) 21# parse_keqv_list, parse_http_list, HTTPDigestAuthHandler 22 23class TrivialTests(unittest.TestCase): 24 def test_trivial(self): 25 # A couple trivial tests 26 27 self.assertRaises(ValueError, urllib2.urlopen, 'bogus url') 28 29 # XXX Name hacking to get this to work on Windows. 30 fname = os.path.abspath(urllib2.__file__).replace(os.sep, '/') 31 32 # And more hacking to get it to work on MacOS. This assumes 33 # urllib.pathname2url works, unfortunately... 34 if os.name == 'riscos': 35 import string 36 fname = os.expand(fname) 37 fname = fname.translate(string.maketrans("/.", "./")) 38 39 if os.name == 'nt': 40 file_url = "file:///%s" % fname 41 else: 42 file_url = "file://%s" % fname 43 44 f = urllib2.urlopen(file_url) 45 46 buf = f.read() 47 f.close() 48 49 def test_parse_http_list(self): 50 tests = [('a,b,c', ['a', 'b', 'c']), 51 ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']), 52 ('a, b, "c", "d", "e,f", g, h', ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']), 53 ('a="b\\"c", d="e\\,f", g="h\\\\i"', ['a="b"c"', 'd="e,f"', 'g="h\\i"'])] 54 for string, list in tests: 55 self.assertEqual(urllib2.parse_http_list(string), list) 56 57 @unittest.skipUnless(ssl, "ssl module required") 58 def test_cafile_and_context(self): 59 context = ssl.create_default_context() 60 with self.assertRaises(ValueError): 61 urllib2.urlopen( 62 "https://localhost", cafile="/nonexistent/path", context=context 63 ) 64 65 66def test_request_headers_dict(): 67 """ 68 The Request.headers dictionary is not a documented interface. It should 69 stay that way, because the complete set of headers are only accessible 70 through the .get_header(), .has_header(), .header_items() interface. 71 However, .headers pre-dates those methods, and so real code will be using 72 the dictionary. 73 74 The introduction in 2.4 of those methods was a mistake for the same reason: 75 code that previously saw all (urllib2 user)-provided headers in .headers 76 now sees only a subset (and the function interface is ugly and incomplete). 77 A better change would have been to replace .headers dict with a dict 78 subclass (or UserDict.DictMixin instance?) that preserved the .headers 79 interface and also provided access to the "unredirected" headers. It's 80 probably too late to fix that, though. 81 82 83 Check .capitalize() case normalization: 84 85 >>> url = "http://example.com" 86 >>> Request(url, headers={"Spam-eggs": "blah"}).headers["Spam-eggs"] 87 'blah' 88 >>> Request(url, headers={"spam-EggS": "blah"}).headers["Spam-eggs"] 89 'blah' 90 91 Currently, Request(url, "Spam-eggs").headers["Spam-Eggs"] raises KeyError, 92 but that could be changed in future. 93 94 """ 95 96def test_request_headers_methods(): 97 """ 98 Note the case normalization of header names here, to .capitalize()-case. 99 This should be preserved for backwards-compatibility. (In the HTTP case, 100 normalization to .title()-case is done by urllib2 before sending headers to 101 httplib). 102 103 >>> url = "http://example.com" 104 >>> r = Request(url, headers={"Spam-eggs": "blah"}) 105 >>> r.has_header("Spam-eggs") 106 True 107 >>> r.header_items() 108 [('Spam-eggs', 'blah')] 109 >>> r.add_header("Foo-Bar", "baz") 110 >>> items = r.header_items() 111 >>> items.sort() 112 >>> items 113 [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')] 114 115 Note that e.g. r.has_header("spam-EggS") is currently False, and 116 r.get_header("spam-EggS") returns None, but that could be changed in 117 future. 118 119 >>> r.has_header("Not-there") 120 False 121 >>> print r.get_header("Not-there") 122 None 123 >>> r.get_header("Not-there", "default") 124 'default' 125 126 """ 127 128 129def test_password_manager(self): 130 """ 131 >>> mgr = urllib2.HTTPPasswordMgr() 132 >>> add = mgr.add_password 133 >>> add("Some Realm", "http://example.com/", "joe", "password") 134 >>> add("Some Realm", "http://example.com/ni", "ni", "ni") 135 >>> add("c", "http://example.com/foo", "foo", "ni") 136 >>> add("c", "http://example.com/bar", "bar", "nini") 137 >>> add("b", "http://example.com/", "first", "blah") 138 >>> add("b", "http://example.com/", "second", "spam") 139 >>> add("a", "http://example.com", "1", "a") 140 >>> add("Some Realm", "http://c.example.com:3128", "3", "c") 141 >>> add("Some Realm", "d.example.com", "4", "d") 142 >>> add("Some Realm", "e.example.com:3128", "5", "e") 143 144 >>> mgr.find_user_password("Some Realm", "example.com") 145 ('joe', 'password') 146 >>> mgr.find_user_password("Some Realm", "http://example.com") 147 ('joe', 'password') 148 >>> mgr.find_user_password("Some Realm", "http://example.com/") 149 ('joe', 'password') 150 >>> mgr.find_user_password("Some Realm", "http://example.com/spam") 151 ('joe', 'password') 152 >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam") 153 ('joe', 'password') 154 >>> mgr.find_user_password("c", "http://example.com/foo") 155 ('foo', 'ni') 156 >>> mgr.find_user_password("c", "http://example.com/bar") 157 ('bar', 'nini') 158 159 Actually, this is really undefined ATM 160## Currently, we use the highest-level path where more than one match: 161 162## >>> mgr.find_user_password("Some Realm", "http://example.com/ni") 163## ('joe', 'password') 164 165 Use latest add_password() in case of conflict: 166 167 >>> mgr.find_user_password("b", "http://example.com/") 168 ('second', 'spam') 169 170 No special relationship between a.example.com and example.com: 171 172 >>> mgr.find_user_password("a", "http://example.com/") 173 ('1', 'a') 174 >>> mgr.find_user_password("a", "http://a.example.com/") 175 (None, None) 176 177 Ports: 178 179 >>> mgr.find_user_password("Some Realm", "c.example.com") 180 (None, None) 181 >>> mgr.find_user_password("Some Realm", "c.example.com:3128") 182 ('3', 'c') 183 >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128") 184 ('3', 'c') 185 >>> mgr.find_user_password("Some Realm", "d.example.com") 186 ('4', 'd') 187 >>> mgr.find_user_password("Some Realm", "e.example.com:3128") 188 ('5', 'e') 189 190 """ 191 pass 192 193 194def test_password_manager_default_port(self): 195 """ 196 >>> mgr = urllib2.HTTPPasswordMgr() 197 >>> add = mgr.add_password 198 199 The point to note here is that we can't guess the default port if there's 200 no scheme. This applies to both add_password and find_user_password. 201 202 >>> add("f", "http://g.example.com:80", "10", "j") 203 >>> add("g", "http://h.example.com", "11", "k") 204 >>> add("h", "i.example.com:80", "12", "l") 205 >>> add("i", "j.example.com", "13", "m") 206 >>> mgr.find_user_password("f", "g.example.com:100") 207 (None, None) 208 >>> mgr.find_user_password("f", "g.example.com:80") 209 ('10', 'j') 210 >>> mgr.find_user_password("f", "g.example.com") 211 (None, None) 212 >>> mgr.find_user_password("f", "http://g.example.com:100") 213 (None, None) 214 >>> mgr.find_user_password("f", "http://g.example.com:80") 215 ('10', 'j') 216 >>> mgr.find_user_password("f", "http://g.example.com") 217 ('10', 'j') 218 >>> mgr.find_user_password("g", "h.example.com") 219 ('11', 'k') 220 >>> mgr.find_user_password("g", "h.example.com:80") 221 ('11', 'k') 222 >>> mgr.find_user_password("g", "http://h.example.com:80") 223 ('11', 'k') 224 >>> mgr.find_user_password("h", "i.example.com") 225 (None, None) 226 >>> mgr.find_user_password("h", "i.example.com:80") 227 ('12', 'l') 228 >>> mgr.find_user_password("h", "http://i.example.com:80") 229 ('12', 'l') 230 >>> mgr.find_user_password("i", "j.example.com") 231 ('13', 'm') 232 >>> mgr.find_user_password("i", "j.example.com:80") 233 (None, None) 234 >>> mgr.find_user_password("i", "http://j.example.com") 235 ('13', 'm') 236 >>> mgr.find_user_password("i", "http://j.example.com:80") 237 (None, None) 238 239 """ 240 241class MockOpener: 242 addheaders = [] 243 def open(self, req, data=None,timeout=socket._GLOBAL_DEFAULT_TIMEOUT): 244 self.req, self.data, self.timeout = req, data, timeout 245 def error(self, proto, *args): 246 self.proto, self.args = proto, args 247 248class MockFile: 249 def read(self, count=None): pass 250 def readline(self, count=None): pass 251 def close(self): pass 252 253class MockHeaders(dict): 254 def getheaders(self, name): 255 return self.values() 256 257class MockResponse(StringIO.StringIO): 258 def __init__(self, code, msg, headers, data, url=None): 259 StringIO.StringIO.__init__(self, data) 260 self.code, self.msg, self.headers, self.url = code, msg, headers, url 261 def info(self): 262 return self.headers 263 def geturl(self): 264 return self.url 265 266class MockCookieJar: 267 def add_cookie_header(self, request): 268 self.ach_req = request 269 def extract_cookies(self, response, request): 270 self.ec_req, self.ec_r = request, response 271 272class FakeMethod: 273 def __init__(self, meth_name, action, handle): 274 self.meth_name = meth_name 275 self.handle = handle 276 self.action = action 277 def __call__(self, *args): 278 return self.handle(self.meth_name, self.action, *args) 279 280class MockHTTPResponse: 281 def __init__(self, fp, msg, status, reason): 282 self.fp = fp 283 self.msg = msg 284 self.status = status 285 self.reason = reason 286 def read(self): 287 return '' 288 289class MockHTTPClass: 290 def __init__(self): 291 self.req_headers = [] 292 self.data = None 293 self.raise_on_endheaders = False 294 self._tunnel_headers = {} 295 296 def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): 297 self.host = host 298 self.timeout = timeout 299 return self 300 301 def set_debuglevel(self, level): 302 self.level = level 303 304 def set_tunnel(self, host, port=None, headers=None): 305 self._tunnel_host = host 306 self._tunnel_port = port 307 if headers: 308 self._tunnel_headers = headers 309 else: 310 self._tunnel_headers.clear() 311 312 def request(self, method, url, body=None, headers=None): 313 self.method = method 314 self.selector = url 315 if headers is not None: 316 self.req_headers += headers.items() 317 self.req_headers.sort() 318 if body: 319 self.data = body 320 if self.raise_on_endheaders: 321 import socket 322 raise socket.error() 323 324 def getresponse(self): 325 return MockHTTPResponse(MockFile(), {}, 200, "OK") 326 327 def close(self): 328 pass 329 330class MockHandler: 331 # useful for testing handler machinery 332 # see add_ordered_mock_handlers() docstring 333 handler_order = 500 334 def __init__(self, methods): 335 self._define_methods(methods) 336 def _define_methods(self, methods): 337 for spec in methods: 338 if len(spec) == 2: name, action = spec 339 else: name, action = spec, None 340 meth = FakeMethod(name, action, self.handle) 341 setattr(self.__class__, name, meth) 342 def handle(self, fn_name, action, *args, **kwds): 343 self.parent.calls.append((self, fn_name, args, kwds)) 344 if action is None: 345 return None 346 elif action == "return self": 347 return self 348 elif action == "return response": 349 res = MockResponse(200, "OK", {}, "") 350 return res 351 elif action == "return request": 352 return Request("http://blah/") 353 elif action.startswith("error"): 354 code = action[action.rfind(" ")+1:] 355 try: 356 code = int(code) 357 except ValueError: 358 pass 359 res = MockResponse(200, "OK", {}, "") 360 return self.parent.error("http", args[0], res, code, "", {}) 361 elif action == "raise": 362 raise urllib2.URLError("blah") 363 assert False 364 def close(self): pass 365 def add_parent(self, parent): 366 self.parent = parent 367 self.parent.calls = [] 368 def __lt__(self, other): 369 if not hasattr(other, "handler_order"): 370 # No handler_order, leave in original order. Yuck. 371 return True 372 return self.handler_order < other.handler_order 373 374def add_ordered_mock_handlers(opener, meth_spec): 375 """Create MockHandlers and add them to an OpenerDirector. 376 377 meth_spec: list of lists of tuples and strings defining methods to define 378 on handlers. eg: 379 380 [["http_error", "ftp_open"], ["http_open"]] 381 382 defines methods .http_error() and .ftp_open() on one handler, and 383 .http_open() on another. These methods just record their arguments and 384 return None. Using a tuple instead of a string causes the method to 385 perform some action (see MockHandler.handle()), eg: 386 387 [["http_error"], [("http_open", "return request")]] 388 389 defines .http_error() on one handler (which simply returns None), and 390 .http_open() on another handler, which returns a Request object. 391 392 """ 393 handlers = [] 394 count = 0 395 for meths in meth_spec: 396 class MockHandlerSubclass(MockHandler): pass 397 h = MockHandlerSubclass(meths) 398 h.handler_order += count 399 h.add_parent(opener) 400 count = count + 1 401 handlers.append(h) 402 opener.add_handler(h) 403 return handlers 404 405def build_test_opener(*handler_instances): 406 opener = OpenerDirector() 407 for h in handler_instances: 408 opener.add_handler(h) 409 return opener 410 411class MockHTTPHandler(urllib2.BaseHandler): 412 # useful for testing redirections and auth 413 # sends supplied headers and code as first response 414 # sends 200 OK as second response 415 def __init__(self, code, headers): 416 self.code = code 417 self.headers = headers 418 self.reset() 419 def reset(self): 420 self._count = 0 421 self.requests = [] 422 def http_open(self, req): 423 import mimetools, copy 424 from StringIO import StringIO 425 self.requests.append(copy.deepcopy(req)) 426 if self._count == 0: 427 self._count = self._count + 1 428 name = httplib.responses[self.code] 429 msg = mimetools.Message(StringIO(self.headers)) 430 return self.parent.error( 431 "http", req, MockFile(), self.code, name, msg) 432 else: 433 self.req = req 434 msg = mimetools.Message(StringIO("\r\n\r\n")) 435 return MockResponse(200, "OK", msg, "", req.get_full_url()) 436 437class MockHTTPSHandler(urllib2.AbstractHTTPHandler): 438 # Useful for testing the Proxy-Authorization request by verifying the 439 # properties of httpcon 440 441 def __init__(self): 442 urllib2.AbstractHTTPHandler.__init__(self) 443 self.httpconn = MockHTTPClass() 444 445 def https_open(self, req): 446 return self.do_open(self.httpconn, req) 447 448class MockPasswordManager: 449 def add_password(self, realm, uri, user, password): 450 self.realm = realm 451 self.url = uri 452 self.user = user 453 self.password = password 454 def find_user_password(self, realm, authuri): 455 self.target_realm = realm 456 self.target_url = authuri 457 return self.user, self.password 458 459 460class OpenerDirectorTests(unittest.TestCase): 461 462 def test_add_non_handler(self): 463 class NonHandler(object): 464 pass 465 self.assertRaises(TypeError, 466 OpenerDirector().add_handler, NonHandler()) 467 468 def test_badly_named_methods(self): 469 # test work-around for three methods that accidentally follow the 470 # naming conventions for handler methods 471 # (*_open() / *_request() / *_response()) 472 473 # These used to call the accidentally-named methods, causing a 474 # TypeError in real code; here, returning self from these mock 475 # methods would either cause no exception, or AttributeError. 476 477 from urllib2 import URLError 478 479 o = OpenerDirector() 480 meth_spec = [ 481 [("do_open", "return self"), ("proxy_open", "return self")], 482 [("redirect_request", "return self")], 483 ] 484 handlers = add_ordered_mock_handlers(o, meth_spec) 485 o.add_handler(urllib2.UnknownHandler()) 486 for scheme in "do", "proxy", "redirect": 487 self.assertRaises(URLError, o.open, scheme+"://example.com/") 488 489 def test_handled(self): 490 # handler returning non-None means no more handlers will be called 491 o = OpenerDirector() 492 meth_spec = [ 493 ["http_open", "ftp_open", "http_error_302"], 494 ["ftp_open"], 495 [("http_open", "return self")], 496 [("http_open", "return self")], 497 ] 498 handlers = add_ordered_mock_handlers(o, meth_spec) 499 500 req = Request("http://example.com/") 501 r = o.open(req) 502 # Second .http_open() gets called, third doesn't, since second returned 503 # non-None. Handlers without .http_open() never get any methods called 504 # on them. 505 # In fact, second mock handler defining .http_open() returns self 506 # (instead of response), which becomes the OpenerDirector's return 507 # value. 508 self.assertEqual(r, handlers[2]) 509 calls = [(handlers[0], "http_open"), (handlers[2], "http_open")] 510 for expected, got in zip(calls, o.calls): 511 handler, name, args, kwds = got 512 self.assertEqual((handler, name), expected) 513 self.assertEqual(args, (req,)) 514 515 def test_handler_order(self): 516 o = OpenerDirector() 517 handlers = [] 518 for meths, handler_order in [ 519 ([("http_open", "return self")], 500), 520 (["http_open"], 0), 521 ]: 522 class MockHandlerSubclass(MockHandler): pass 523 h = MockHandlerSubclass(meths) 524 h.handler_order = handler_order 525 handlers.append(h) 526 o.add_handler(h) 527 528 r = o.open("http://example.com/") 529 # handlers called in reverse order, thanks to their sort order 530 self.assertEqual(o.calls[0][0], handlers[1]) 531 self.assertEqual(o.calls[1][0], handlers[0]) 532 533 def test_raise(self): 534 # raising URLError stops processing of request 535 o = OpenerDirector() 536 meth_spec = [ 537 [("http_open", "raise")], 538 [("http_open", "return self")], 539 ] 540 handlers = add_ordered_mock_handlers(o, meth_spec) 541 542 req = Request("http://example.com/") 543 self.assertRaises(urllib2.URLError, o.open, req) 544 self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})]) 545 546## def test_error(self): 547## # XXX this doesn't actually seem to be used in standard library, 548## # but should really be tested anyway... 549 550 def test_http_error(self): 551 # XXX http_error_default 552 # http errors are a special case 553 o = OpenerDirector() 554 meth_spec = [ 555 [("http_open", "error 302")], 556 [("http_error_400", "raise"), "http_open"], 557 [("http_error_302", "return response"), "http_error_303", 558 "http_error"], 559 [("http_error_302")], 560 ] 561 handlers = add_ordered_mock_handlers(o, meth_spec) 562 563 class Unknown: 564 def __eq__(self, other): return True 565 566 req = Request("http://example.com/") 567 r = o.open(req) 568 assert len(o.calls) == 2 569 calls = [(handlers[0], "http_open", (req,)), 570 (handlers[2], "http_error_302", 571 (req, Unknown(), 302, "", {}))] 572 for expected, got in zip(calls, o.calls): 573 handler, method_name, args = expected 574 self.assertEqual((handler, method_name), got[:2]) 575 self.assertEqual(args, got[2]) 576 577 def test_processors(self): 578 # *_request / *_response methods get called appropriately 579 o = OpenerDirector() 580 meth_spec = [ 581 [("http_request", "return request"), 582 ("http_response", "return response")], 583 [("http_request", "return request"), 584 ("http_response", "return response")], 585 ] 586 handlers = add_ordered_mock_handlers(o, meth_spec) 587 588 req = Request("http://example.com/") 589 r = o.open(req) 590 # processor methods are called on *all* handlers that define them, 591 # not just the first handler that handles the request 592 calls = [ 593 (handlers[0], "http_request"), (handlers[1], "http_request"), 594 (handlers[0], "http_response"), (handlers[1], "http_response")] 595 596 for i, (handler, name, args, kwds) in enumerate(o.calls): 597 if i < 2: 598 # *_request 599 self.assertEqual((handler, name), calls[i]) 600 self.assertEqual(len(args), 1) 601 self.assertIsInstance(args[0], Request) 602 else: 603 # *_response 604 self.assertEqual((handler, name), calls[i]) 605 self.assertEqual(len(args), 2) 606 self.assertIsInstance(args[0], Request) 607 # response from opener.open is None, because there's no 608 # handler that defines http_open to handle it 609 if args[1] is not None: 610 self.assertIsInstance(args[1], MockResponse) 611 612 613def sanepathname2url(path): 614 import urllib 615 urlpath = urllib.pathname2url(path) 616 if os.name == "nt" and urlpath.startswith("///"): 617 urlpath = urlpath[2:] 618 # XXX don't ask me about the mac... 619 return urlpath 620 621class HandlerTests(unittest.TestCase): 622 623 def test_ftp(self): 624 class MockFTPWrapper: 625 def __init__(self, data): self.data = data 626 def retrfile(self, filename, filetype): 627 self.filename, self.filetype = filename, filetype 628 return StringIO.StringIO(self.data), len(self.data) 629 def close(self): pass 630 631 class NullFTPHandler(urllib2.FTPHandler): 632 def __init__(self, data): self.data = data 633 def connect_ftp(self, user, passwd, host, port, dirs, 634 timeout=socket._GLOBAL_DEFAULT_TIMEOUT): 635 self.user, self.passwd = user, passwd 636 self.host, self.port = host, port 637 self.dirs = dirs 638 self.ftpwrapper = MockFTPWrapper(self.data) 639 return self.ftpwrapper 640 641 import ftplib 642 data = "rheum rhaponicum" 643 h = NullFTPHandler(data) 644 o = h.parent = MockOpener() 645 646 for url, host, port, user, passwd, type_, dirs, filename, mimetype in [ 647 ("ftp://localhost/foo/bar/baz.html", 648 "localhost", ftplib.FTP_PORT, "", "", "I", 649 ["foo", "bar"], "baz.html", "text/html"), 650 ("ftp://parrot@localhost/foo/bar/baz.html", 651 "localhost", ftplib.FTP_PORT, "parrot", "", "I", 652 ["foo", "bar"], "baz.html", "text/html"), 653 ("ftp://%25parrot@localhost/foo/bar/baz.html", 654 "localhost", ftplib.FTP_PORT, "%parrot", "", "I", 655 ["foo", "bar"], "baz.html", "text/html"), 656 ("ftp://%2542parrot@localhost/foo/bar/baz.html", 657 "localhost", ftplib.FTP_PORT, "%42parrot", "", "I", 658 ["foo", "bar"], "baz.html", "text/html"), 659 ("ftp://localhost:80/foo/bar/", 660 "localhost", 80, "", "", "D", 661 ["foo", "bar"], "", None), 662 ("ftp://localhost/baz.gif;type=a", 663 "localhost", ftplib.FTP_PORT, "", "", "A", 664 [], "baz.gif", None), # XXX really this should guess image/gif 665 ]: 666 req = Request(url) 667 req.timeout = None 668 r = h.ftp_open(req) 669 # ftp authentication not yet implemented by FTPHandler 670 self.assertEqual(h.user, user) 671 self.assertEqual(h.passwd, passwd) 672 self.assertEqual(h.host, socket.gethostbyname(host)) 673 self.assertEqual(h.port, port) 674 self.assertEqual(h.dirs, dirs) 675 self.assertEqual(h.ftpwrapper.filename, filename) 676 self.assertEqual(h.ftpwrapper.filetype, type_) 677 headers = r.info() 678 self.assertEqual(headers.get("Content-type"), mimetype) 679 self.assertEqual(int(headers["Content-length"]), len(data)) 680 681 def test_file(self): 682 import rfc822, socket 683 h = urllib2.FileHandler() 684 o = h.parent = MockOpener() 685 686 TESTFN = test_support.TESTFN 687 urlpath = sanepathname2url(os.path.abspath(TESTFN)) 688 towrite = "hello, world\n" 689 urls = [ 690 "file://localhost%s" % urlpath, 691 "file://%s" % urlpath, 692 "file://%s%s" % (socket.gethostbyname('localhost'), urlpath), 693 ] 694 try: 695 localaddr = socket.gethostbyname(socket.gethostname()) 696 except socket.gaierror: 697 localaddr = '' 698 if localaddr: 699 urls.append("file://%s%s" % (localaddr, urlpath)) 700 701 for url in urls: 702 f = open(TESTFN, "wb") 703 try: 704 try: 705 f.write(towrite) 706 finally: 707 f.close() 708 709 r = h.file_open(Request(url)) 710 try: 711 data = r.read() 712 headers = r.info() 713 respurl = r.geturl() 714 finally: 715 r.close() 716 stats = os.stat(TESTFN) 717 modified = rfc822.formatdate(stats.st_mtime) 718 finally: 719 os.remove(TESTFN) 720 self.assertEqual(data, towrite) 721 self.assertEqual(headers["Content-type"], "text/plain") 722 self.assertEqual(headers["Content-length"], "13") 723 self.assertEqual(headers["Last-modified"], modified) 724 self.assertEqual(respurl, url) 725 726 for url in [ 727 "file://localhost:80%s" % urlpath, 728 "file:///file_does_not_exist.txt", 729 "file://%s:80%s/%s" % (socket.gethostbyname('localhost'), 730 os.getcwd(), TESTFN), 731 "file://somerandomhost.ontheinternet.com%s/%s" % 732 (os.getcwd(), TESTFN), 733 ]: 734 try: 735 f = open(TESTFN, "wb") 736 try: 737 f.write(towrite) 738 finally: 739 f.close() 740 741 self.assertRaises(urllib2.URLError, 742 h.file_open, Request(url)) 743 finally: 744 os.remove(TESTFN) 745 746 h = urllib2.FileHandler() 747 o = h.parent = MockOpener() 748 # XXXX why does // mean ftp (and /// mean not ftp!), and where 749 # is file: scheme specified? I think this is really a bug, and 750 # what was intended was to distinguish between URLs like: 751 # file:/blah.txt (a file) 752 # file://localhost/blah.txt (a file) 753 # file:///blah.txt (a file) 754 # file://ftp.example.com/blah.txt (an ftp URL) 755 for url, ftp in [ 756 ("file://ftp.example.com//foo.txt", True), 757 ("file://ftp.example.com///foo.txt", False), 758# XXXX bug: fails with OSError, should be URLError 759 ("file://ftp.example.com/foo.txt", False), 760 ("file://somehost//foo/something.txt", True), 761 ("file://localhost//foo/something.txt", False), 762 ]: 763 req = Request(url) 764 try: 765 h.file_open(req) 766 # XXXX remove OSError when bug fixed 767 except (urllib2.URLError, OSError): 768 self.assertTrue(not ftp) 769 else: 770 self.assertTrue(o.req is req) 771 self.assertEqual(req.type, "ftp") 772 self.assertEqual(req.type == "ftp", ftp) 773 774 def test_http(self): 775 776 h = urllib2.AbstractHTTPHandler() 777 o = h.parent = MockOpener() 778 779 url = "http://example.com/" 780 for method, data in [("GET", None), ("POST", "blah")]: 781 req = Request(url, data, {"Foo": "bar"}) 782 req.timeout = None 783 req.add_unredirected_header("Spam", "eggs") 784 http = MockHTTPClass() 785 r = h.do_open(http, req) 786 787 # result attributes 788 r.read; r.readline # wrapped MockFile methods 789 r.info; r.geturl # addinfourl methods 790 r.code, r.msg == 200, "OK" # added from MockHTTPClass.getreply() 791 hdrs = r.info() 792 hdrs.get; hdrs.has_key # r.info() gives dict from .getreply() 793 self.assertEqual(r.geturl(), url) 794 795 self.assertEqual(http.host, "example.com") 796 self.assertEqual(http.level, 0) 797 self.assertEqual(http.method, method) 798 self.assertEqual(http.selector, "/") 799 self.assertEqual(http.req_headers, 800 [("Connection", "close"), 801 ("Foo", "bar"), ("Spam", "eggs")]) 802 self.assertEqual(http.data, data) 803 804 # check socket.error converted to URLError 805 http.raise_on_endheaders = True 806 self.assertRaises(urllib2.URLError, h.do_open, http, req) 807 808 # check adding of standard headers 809 o.addheaders = [("Spam", "eggs")] 810 for data in "", None: # POST, GET 811 req = Request("http://example.com/", data) 812 r = MockResponse(200, "OK", {}, "") 813 newreq = h.do_request_(req) 814 if data is None: # GET 815 self.assertNotIn("Content-length", req.unredirected_hdrs) 816 self.assertNotIn("Content-type", req.unredirected_hdrs) 817 else: # POST 818 self.assertEqual(req.unredirected_hdrs["Content-length"], "0") 819 self.assertEqual(req.unredirected_hdrs["Content-type"], 820 "application/x-www-form-urlencoded") 821 # XXX the details of Host could be better tested 822 self.assertEqual(req.unredirected_hdrs["Host"], "example.com") 823 self.assertEqual(req.unredirected_hdrs["Spam"], "eggs") 824 825 # don't clobber existing headers 826 req.add_unredirected_header("Content-length", "foo") 827 req.add_unredirected_header("Content-type", "bar") 828 req.add_unredirected_header("Host", "baz") 829 req.add_unredirected_header("Spam", "foo") 830 newreq = h.do_request_(req) 831 self.assertEqual(req.unredirected_hdrs["Content-length"], "foo") 832 self.assertEqual(req.unredirected_hdrs["Content-type"], "bar") 833 self.assertEqual(req.unredirected_hdrs["Host"], "baz") 834 self.assertEqual(req.unredirected_hdrs["Spam"], "foo") 835 836 def test_http_doubleslash(self): 837 # Checks that the presence of an unnecessary double slash in a url doesn't break anything 838 # Previously, a double slash directly after the host could cause incorrect parsing of the url 839 h = urllib2.AbstractHTTPHandler() 840 o = h.parent = MockOpener() 841 842 data = "" 843 ds_urls = [ 844 "http://example.com/foo/bar/baz.html", 845 "http://example.com//foo/bar/baz.html", 846 "http://example.com/foo//bar/baz.html", 847 "http://example.com/foo/bar//baz.html", 848 ] 849 850 for ds_url in ds_urls: 851 ds_req = Request(ds_url, data) 852 853 # Check whether host is determined correctly if there is no proxy 854 np_ds_req = h.do_request_(ds_req) 855 self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com") 856 857 # Check whether host is determined correctly if there is a proxy 858 ds_req.set_proxy("someproxy:3128",None) 859 p_ds_req = h.do_request_(ds_req) 860 self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com") 861 862 def test_fixpath_in_weirdurls(self): 863 # Issue4493: urllib2 to supply '/' when to urls where path does not 864 # start with'/' 865 866 h = urllib2.AbstractHTTPHandler() 867 o = h.parent = MockOpener() 868 869 weird_url = 'http://www.python.org?getspam' 870 req = Request(weird_url) 871 newreq = h.do_request_(req) 872 self.assertEqual(newreq.get_host(),'www.python.org') 873 self.assertEqual(newreq.get_selector(),'/?getspam') 874 875 url_without_path = 'http://www.python.org' 876 req = Request(url_without_path) 877 newreq = h.do_request_(req) 878 self.assertEqual(newreq.get_host(),'www.python.org') 879 self.assertEqual(newreq.get_selector(),'') 880 881 def test_errors(self): 882 h = urllib2.HTTPErrorProcessor() 883 o = h.parent = MockOpener() 884 885 url = "http://example.com/" 886 req = Request(url) 887 # all 2xx are passed through 888 r = MockResponse(200, "OK", {}, "", url) 889 newr = h.http_response(req, r) 890 self.assertTrue(r is newr) 891 self.assertTrue(not hasattr(o, "proto")) # o.error not called 892 r = MockResponse(202, "Accepted", {}, "", url) 893 newr = h.http_response(req, r) 894 self.assertTrue(r is newr) 895 self.assertTrue(not hasattr(o, "proto")) # o.error not called 896 r = MockResponse(206, "Partial content", {}, "", url) 897 newr = h.http_response(req, r) 898 self.assertTrue(r is newr) 899 self.assertTrue(not hasattr(o, "proto")) # o.error not called 900 # anything else calls o.error (and MockOpener returns None, here) 901 r = MockResponse(502, "Bad gateway", {}, "", url) 902 self.assertTrue(h.http_response(req, r) is None) 903 self.assertEqual(o.proto, "http") # o.error called 904 self.assertEqual(o.args, (req, r, 502, "Bad gateway", {})) 905 906 def test_cookies(self): 907 cj = MockCookieJar() 908 h = urllib2.HTTPCookieProcessor(cj) 909 o = h.parent = MockOpener() 910 911 req = Request("http://example.com/") 912 r = MockResponse(200, "OK", {}, "") 913 newreq = h.http_request(req) 914 self.assertTrue(cj.ach_req is req is newreq) 915 self.assertEqual(req.get_origin_req_host(), "example.com") 916 self.assertTrue(not req.is_unverifiable()) 917 newr = h.http_response(req, r) 918 self.assertTrue(cj.ec_req is req) 919 self.assertTrue(cj.ec_r is r is newr) 920 921 def test_redirect(self): 922 from_url = "http://example.com/a.html" 923 to_url = "http://example.com/b.html" 924 h = urllib2.HTTPRedirectHandler() 925 o = h.parent = MockOpener() 926 927 # ordinary redirect behaviour 928 for code in 301, 302, 303, 307: 929 for data in None, "blah\nblah\n": 930 method = getattr(h, "http_error_%s" % code) 931 req = Request(from_url, data) 932 req.add_header("Nonsense", "viking=withhold") 933 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 934 if data is not None: 935 req.add_header("Content-Length", str(len(data))) 936 req.add_unredirected_header("Spam", "spam") 937 try: 938 method(req, MockFile(), code, "Blah", 939 MockHeaders({"location": to_url})) 940 except urllib2.HTTPError: 941 # 307 in response to POST requires user OK 942 self.assertEqual(code, 307) 943 self.assertIsNotNone(data) 944 self.assertEqual(o.req.get_full_url(), to_url) 945 try: 946 self.assertEqual(o.req.get_method(), "GET") 947 except AttributeError: 948 self.assertTrue(not o.req.has_data()) 949 950 # now it's a GET, there should not be headers regarding content 951 # (possibly dragged from before being a POST) 952 headers = [x.lower() for x in o.req.headers] 953 self.assertNotIn("content-length", headers) 954 self.assertNotIn("content-type", headers) 955 956 self.assertEqual(o.req.headers["Nonsense"], 957 "viking=withhold") 958 self.assertNotIn("Spam", o.req.headers) 959 self.assertNotIn("Spam", o.req.unredirected_hdrs) 960 961 # loop detection 962 req = Request(from_url) 963 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 964 def redirect(h, req, url=to_url): 965 h.http_error_302(req, MockFile(), 302, "Blah", 966 MockHeaders({"location": url})) 967 # Note that the *original* request shares the same record of 968 # redirections with the sub-requests caused by the redirections. 969 970 # detect infinite loop redirect of a URL to itself 971 req = Request(from_url, origin_req_host="example.com") 972 count = 0 973 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 974 try: 975 while 1: 976 redirect(h, req, "http://example.com/") 977 count = count + 1 978 except urllib2.HTTPError: 979 # don't stop until max_repeats, because cookies may introduce state 980 self.assertEqual(count, urllib2.HTTPRedirectHandler.max_repeats) 981 982 # detect endless non-repeating chain of redirects 983 req = Request(from_url, origin_req_host="example.com") 984 count = 0 985 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 986 try: 987 while 1: 988 redirect(h, req, "http://example.com/%d" % count) 989 count = count + 1 990 except urllib2.HTTPError: 991 self.assertEqual(count, 992 urllib2.HTTPRedirectHandler.max_redirections) 993 994 def test_invalid_redirect(self): 995 from_url = "http://example.com/a.html" 996 valid_schemes = ['http', 'https', 'ftp'] 997 invalid_schemes = ['file', 'imap', 'ldap'] 998 schemeless_url = "example.com/b.html" 999 h = urllib2.HTTPRedirectHandler() 1000 o = h.parent = MockOpener() 1001 req = Request(from_url) 1002 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 1003 1004 for scheme in invalid_schemes: 1005 invalid_url = scheme + '://' + schemeless_url 1006 self.assertRaises(urllib2.HTTPError, h.http_error_302, 1007 req, MockFile(), 302, "Security Loophole", 1008 MockHeaders({"location": invalid_url})) 1009 1010 for scheme in valid_schemes: 1011 valid_url = scheme + '://' + schemeless_url 1012 h.http_error_302(req, MockFile(), 302, "That's fine", 1013 MockHeaders({"location": valid_url})) 1014 self.assertEqual(o.req.get_full_url(), valid_url) 1015 1016 def test_cookie_redirect(self): 1017 # cookies shouldn't leak into redirected requests 1018 from cookielib import CookieJar 1019 1020 from test.test_cookielib import interact_netscape 1021 1022 cj = CookieJar() 1023 interact_netscape(cj, "http://www.example.com/", "spam=eggs") 1024 hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n") 1025 hdeh = urllib2.HTTPDefaultErrorHandler() 1026 hrh = urllib2.HTTPRedirectHandler() 1027 cp = urllib2.HTTPCookieProcessor(cj) 1028 o = build_test_opener(hh, hdeh, hrh, cp) 1029 o.open("http://www.example.com/") 1030 self.assertTrue(not hh.req.has_header("Cookie")) 1031 1032 def test_redirect_fragment(self): 1033 redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n' 1034 hh = MockHTTPHandler(302, 'Location: ' + redirected_url) 1035 hdeh = urllib2.HTTPDefaultErrorHandler() 1036 hrh = urllib2.HTTPRedirectHandler() 1037 o = build_test_opener(hh, hdeh, hrh) 1038 fp = o.open('http://www.example.com') 1039 self.assertEqual(fp.geturl(), redirected_url.strip()) 1040 1041 def test_redirect_no_path(self): 1042 # Issue 14132: Relative redirect strips original path 1043 real_class = httplib.HTTPConnection 1044 response1 = b"HTTP/1.1 302 Found\r\nLocation: ?query\r\n\r\n" 1045 httplib.HTTPConnection = test_urllib.fakehttp(response1) 1046 self.addCleanup(setattr, httplib, "HTTPConnection", real_class) 1047 urls = iter(("/path", "/path?query")) 1048 def request(conn, method, url, *pos, **kw): 1049 self.assertEqual(url, next(urls)) 1050 real_class.request(conn, method, url, *pos, **kw) 1051 # Change response for subsequent connection 1052 conn.__class__.fakedata = b"HTTP/1.1 200 OK\r\n\r\nHello!" 1053 httplib.HTTPConnection.request = request 1054 fp = urllib2.urlopen("http://python.org/path") 1055 self.assertEqual(fp.geturl(), "http://python.org/path?query") 1056 1057 def test_proxy(self): 1058 o = OpenerDirector() 1059 ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128")) 1060 o.add_handler(ph) 1061 meth_spec = [ 1062 [("http_open", "return response")] 1063 ] 1064 handlers = add_ordered_mock_handlers(o, meth_spec) 1065 1066 req = Request("http://acme.example.com/") 1067 self.assertEqual(req.get_host(), "acme.example.com") 1068 r = o.open(req) 1069 self.assertEqual(req.get_host(), "proxy.example.com:3128") 1070 1071 self.assertEqual([(handlers[0], "http_open")], 1072 [tup[0:2] for tup in o.calls]) 1073 1074 def test_proxy_no_proxy(self): 1075 os.environ['no_proxy'] = 'python.org' 1076 o = OpenerDirector() 1077 ph = urllib2.ProxyHandler(dict(http="proxy.example.com")) 1078 o.add_handler(ph) 1079 req = Request("http://www.perl.org/") 1080 self.assertEqual(req.get_host(), "www.perl.org") 1081 r = o.open(req) 1082 self.assertEqual(req.get_host(), "proxy.example.com") 1083 req = Request("http://www.python.org") 1084 self.assertEqual(req.get_host(), "www.python.org") 1085 r = o.open(req) 1086 self.assertEqual(req.get_host(), "www.python.org") 1087 del os.environ['no_proxy'] 1088 1089 1090 def test_proxy_https(self): 1091 o = OpenerDirector() 1092 ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128')) 1093 o.add_handler(ph) 1094 meth_spec = [ 1095 [("https_open","return response")] 1096 ] 1097 handlers = add_ordered_mock_handlers(o, meth_spec) 1098 req = Request("https://www.example.com/") 1099 self.assertEqual(req.get_host(), "www.example.com") 1100 r = o.open(req) 1101 self.assertEqual(req.get_host(), "proxy.example.com:3128") 1102 self.assertEqual([(handlers[0], "https_open")], 1103 [tup[0:2] for tup in o.calls]) 1104 1105 def test_proxy_https_proxy_authorization(self): 1106 o = OpenerDirector() 1107 ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128')) 1108 o.add_handler(ph) 1109 https_handler = MockHTTPSHandler() 1110 o.add_handler(https_handler) 1111 req = Request("https://www.example.com/") 1112 req.add_header("Proxy-Authorization","FooBar") 1113 req.add_header("User-Agent","Grail") 1114 self.assertEqual(req.get_host(), "www.example.com") 1115 self.assertIsNone(req._tunnel_host) 1116 r = o.open(req) 1117 # Verify Proxy-Authorization gets tunneled to request. 1118 # httpsconn req_headers do not have the Proxy-Authorization header but 1119 # the req will have. 1120 self.assertNotIn(("Proxy-Authorization","FooBar"), 1121 https_handler.httpconn.req_headers) 1122 self.assertIn(("User-Agent","Grail"), 1123 https_handler.httpconn.req_headers) 1124 self.assertIsNotNone(req._tunnel_host) 1125 self.assertEqual(req.get_host(), "proxy.example.com:3128") 1126 self.assertEqual(req.get_header("Proxy-authorization"),"FooBar") 1127 1128 def test_basic_auth(self, quote_char='"'): 1129 opener = OpenerDirector() 1130 password_manager = MockPasswordManager() 1131 auth_handler = urllib2.HTTPBasicAuthHandler(password_manager) 1132 realm = "ACME Widget Store" 1133 http_handler = MockHTTPHandler( 1134 401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' % 1135 (quote_char, realm, quote_char) ) 1136 opener.add_handler(auth_handler) 1137 opener.add_handler(http_handler) 1138 self._test_basic_auth(opener, auth_handler, "Authorization", 1139 realm, http_handler, password_manager, 1140 "http://acme.example.com/protected", 1141 "http://acme.example.com/protected" 1142 ) 1143 1144 def test_basic_auth_with_single_quoted_realm(self): 1145 self.test_basic_auth(quote_char="'") 1146 1147 def test_basic_auth_with_unquoted_realm(self): 1148 opener = OpenerDirector() 1149 password_manager = MockPasswordManager() 1150 auth_handler = urllib2.HTTPBasicAuthHandler(password_manager) 1151 realm = "ACME Widget Store" 1152 http_handler = MockHTTPHandler( 1153 401, 'WWW-Authenticate: Basic realm=%s\r\n\r\n' % realm) 1154 opener.add_handler(auth_handler) 1155 opener.add_handler(http_handler) 1156 msg = "Basic Auth Realm was unquoted" 1157 with test_support.check_warnings((msg, UserWarning)): 1158 self._test_basic_auth(opener, auth_handler, "Authorization", 1159 realm, http_handler, password_manager, 1160 "http://acme.example.com/protected", 1161 "http://acme.example.com/protected" 1162 ) 1163 1164 1165 def test_proxy_basic_auth(self): 1166 opener = OpenerDirector() 1167 ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128")) 1168 opener.add_handler(ph) 1169 password_manager = MockPasswordManager() 1170 auth_handler = urllib2.ProxyBasicAuthHandler(password_manager) 1171 realm = "ACME Networks" 1172 http_handler = MockHTTPHandler( 1173 407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm) 1174 opener.add_handler(auth_handler) 1175 opener.add_handler(http_handler) 1176 self._test_basic_auth(opener, auth_handler, "Proxy-authorization", 1177 realm, http_handler, password_manager, 1178 "http://acme.example.com:3128/protected", 1179 "proxy.example.com:3128", 1180 ) 1181 1182 def test_basic_and_digest_auth_handlers(self): 1183 # HTTPDigestAuthHandler raised an exception if it couldn't handle a 40* 1184 # response (http://python.org/sf/1479302), where it should instead 1185 # return None to allow another handler (especially 1186 # HTTPBasicAuthHandler) to handle the response. 1187 1188 # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must 1189 # try digest first (since it's the strongest auth scheme), so we record 1190 # order of calls here to check digest comes first: 1191 class RecordingOpenerDirector(OpenerDirector): 1192 def __init__(self): 1193 OpenerDirector.__init__(self) 1194 self.recorded = [] 1195 def record(self, info): 1196 self.recorded.append(info) 1197 class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler): 1198 def http_error_401(self, *args, **kwds): 1199 self.parent.record("digest") 1200 urllib2.HTTPDigestAuthHandler.http_error_401(self, 1201 *args, **kwds) 1202 class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler): 1203 def http_error_401(self, *args, **kwds): 1204 self.parent.record("basic") 1205 urllib2.HTTPBasicAuthHandler.http_error_401(self, 1206 *args, **kwds) 1207 1208 opener = RecordingOpenerDirector() 1209 password_manager = MockPasswordManager() 1210 digest_handler = TestDigestAuthHandler(password_manager) 1211 basic_handler = TestBasicAuthHandler(password_manager) 1212 realm = "ACME Networks" 1213 http_handler = MockHTTPHandler( 1214 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm) 1215 opener.add_handler(basic_handler) 1216 opener.add_handler(digest_handler) 1217 opener.add_handler(http_handler) 1218 1219 # check basic auth isn't blocked by digest handler failing 1220 self._test_basic_auth(opener, basic_handler, "Authorization", 1221 realm, http_handler, password_manager, 1222 "http://acme.example.com/protected", 1223 "http://acme.example.com/protected", 1224 ) 1225 # check digest was tried before basic (twice, because 1226 # _test_basic_auth called .open() twice) 1227 self.assertEqual(opener.recorded, ["digest", "basic"]*2) 1228 1229 def _test_basic_auth(self, opener, auth_handler, auth_header, 1230 realm, http_handler, password_manager, 1231 request_url, protected_url): 1232 import base64 1233 user, password = "wile", "coyote" 1234 1235 # .add_password() fed through to password manager 1236 auth_handler.add_password(realm, request_url, user, password) 1237 self.assertEqual(realm, password_manager.realm) 1238 self.assertEqual(request_url, password_manager.url) 1239 self.assertEqual(user, password_manager.user) 1240 self.assertEqual(password, password_manager.password) 1241 1242 r = opener.open(request_url) 1243 1244 # should have asked the password manager for the username/password 1245 self.assertEqual(password_manager.target_realm, realm) 1246 self.assertEqual(password_manager.target_url, protected_url) 1247 1248 # expect one request without authorization, then one with 1249 self.assertEqual(len(http_handler.requests), 2) 1250 self.assertFalse(http_handler.requests[0].has_header(auth_header)) 1251 userpass = '%s:%s' % (user, password) 1252 auth_hdr_value = 'Basic '+base64.encodestring(userpass).strip() 1253 self.assertEqual(http_handler.requests[1].get_header(auth_header), 1254 auth_hdr_value) 1255 self.assertEqual(http_handler.requests[1].unredirected_hdrs[auth_header], 1256 auth_hdr_value) 1257 # if the password manager can't find a password, the handler won't 1258 # handle the HTTP auth error 1259 password_manager.user = password_manager.password = None 1260 http_handler.reset() 1261 r = opener.open(request_url) 1262 self.assertEqual(len(http_handler.requests), 1) 1263 self.assertFalse(http_handler.requests[0].has_header(auth_header)) 1264 1265class MiscTests(unittest.TestCase): 1266 1267 def test_build_opener(self): 1268 class MyHTTPHandler(urllib2.HTTPHandler): pass 1269 class FooHandler(urllib2.BaseHandler): 1270 def foo_open(self): pass 1271 class BarHandler(urllib2.BaseHandler): 1272 def bar_open(self): pass 1273 1274 build_opener = urllib2.build_opener 1275 1276 o = build_opener(FooHandler, BarHandler) 1277 self.opener_has_handler(o, FooHandler) 1278 self.opener_has_handler(o, BarHandler) 1279 1280 # can take a mix of classes and instances 1281 o = build_opener(FooHandler, BarHandler()) 1282 self.opener_has_handler(o, FooHandler) 1283 self.opener_has_handler(o, BarHandler) 1284 1285 # subclasses of default handlers override default handlers 1286 o = build_opener(MyHTTPHandler) 1287 self.opener_has_handler(o, MyHTTPHandler) 1288 1289 # a particular case of overriding: default handlers can be passed 1290 # in explicitly 1291 o = build_opener() 1292 self.opener_has_handler(o, urllib2.HTTPHandler) 1293 o = build_opener(urllib2.HTTPHandler) 1294 self.opener_has_handler(o, urllib2.HTTPHandler) 1295 o = build_opener(urllib2.HTTPHandler()) 1296 self.opener_has_handler(o, urllib2.HTTPHandler) 1297 1298 # Issue2670: multiple handlers sharing the same base class 1299 class MyOtherHTTPHandler(urllib2.HTTPHandler): pass 1300 o = build_opener(MyHTTPHandler, MyOtherHTTPHandler) 1301 self.opener_has_handler(o, MyHTTPHandler) 1302 self.opener_has_handler(o, MyOtherHTTPHandler) 1303 1304 def opener_has_handler(self, opener, handler_class): 1305 for h in opener.handlers: 1306 if h.__class__ == handler_class: 1307 break 1308 else: 1309 self.assertTrue(False) 1310 1311 def test_unsupported_algorithm(self): 1312 handler = AbstractDigestAuthHandler() 1313 with self.assertRaises(ValueError) as exc: 1314 handler.get_algorithm_impls('invalid') 1315 self.assertEqual( 1316 str(exc.exception), 1317 "Unsupported digest authentication algorithm 'invalid'" 1318 ) 1319 1320 1321class RequestTests(unittest.TestCase): 1322 1323 def setUp(self): 1324 self.get = urllib2.Request("http://www.python.org/~jeremy/") 1325 self.post = urllib2.Request("http://www.python.org/~jeremy/", 1326 "data", 1327 headers={"X-Test": "test"}) 1328 1329 def test_method(self): 1330 self.assertEqual("POST", self.post.get_method()) 1331 self.assertEqual("GET", self.get.get_method()) 1332 1333 def test_add_data(self): 1334 self.assertTrue(not self.get.has_data()) 1335 self.assertEqual("GET", self.get.get_method()) 1336 self.get.add_data("spam") 1337 self.assertTrue(self.get.has_data()) 1338 self.assertEqual("POST", self.get.get_method()) 1339 1340 def test_get_full_url(self): 1341 self.assertEqual("http://www.python.org/~jeremy/", 1342 self.get.get_full_url()) 1343 1344 def test_selector(self): 1345 self.assertEqual("/~jeremy/", self.get.get_selector()) 1346 req = urllib2.Request("http://www.python.org/") 1347 self.assertEqual("/", req.get_selector()) 1348 1349 def test_get_type(self): 1350 self.assertEqual("http", self.get.get_type()) 1351 1352 def test_get_host(self): 1353 self.assertEqual("www.python.org", self.get.get_host()) 1354 1355 def test_get_host_unquote(self): 1356 req = urllib2.Request("http://www.%70ython.org/") 1357 self.assertEqual("www.python.org", req.get_host()) 1358 1359 def test_proxy(self): 1360 self.assertTrue(not self.get.has_proxy()) 1361 self.get.set_proxy("www.perl.org", "http") 1362 self.assertTrue(self.get.has_proxy()) 1363 self.assertEqual("www.python.org", self.get.get_origin_req_host()) 1364 self.assertEqual("www.perl.org", self.get.get_host()) 1365 1366 def test_wrapped_url(self): 1367 req = Request("<URL:http://www.python.org>") 1368 self.assertEqual("www.python.org", req.get_host()) 1369 1370 def test_url_fragment(self): 1371 req = Request("http://www.python.org/?qs=query#fragment=true") 1372 self.assertEqual("/?qs=query", req.get_selector()) 1373 req = Request("http://www.python.org/#fun=true") 1374 self.assertEqual("/", req.get_selector()) 1375 1376 # Issue 11703: geturl() omits fragment in the original URL. 1377 url = 'http://docs.python.org/library/urllib2.html#OK' 1378 req = Request(url) 1379 self.assertEqual(req.get_full_url(), url) 1380 1381 def test_private_attributes(self): 1382 self.assertFalse(hasattr(self.get, '_Request__r_xxx')) 1383 # Issue #6500: infinite recursion 1384 self.assertFalse(hasattr(self.get, '_Request__r_method')) 1385 1386 def test_HTTPError_interface(self): 1387 """ 1388 Issue 13211 reveals that HTTPError didn't implement the URLError 1389 interface even though HTTPError is a subclass of URLError. 1390 1391 >>> err = urllib2.HTTPError(msg='something bad happened', url=None, code=None, hdrs=None, fp=None) 1392 >>> assert hasattr(err, 'reason') 1393 >>> err.reason 1394 'something bad happened' 1395 """ 1396 1397 def test_HTTPError_interface_call(self): 1398 """ 1399 Issue 15701= - HTTPError interface has info method available from URLError. 1400 """ 1401 err = urllib2.HTTPError(msg='something bad happened', url=None, 1402 code=None, hdrs='Content-Length:42', fp=None) 1403 self.assertTrue(hasattr(err, 'reason')) 1404 assert hasattr(err, 'reason') 1405 assert hasattr(err, 'info') 1406 assert callable(err.info) 1407 try: 1408 err.info() 1409 except AttributeError: 1410 self.fail("err.info() failed") 1411 self.assertEqual(err.info(), "Content-Length:42") 1412 1413def test_main(verbose=None): 1414 from test import test_urllib2 1415 test_support.run_doctest(test_urllib2, verbose) 1416 test_support.run_doctest(urllib2, verbose) 1417 tests = (TrivialTests, 1418 OpenerDirectorTests, 1419 HandlerTests, 1420 MiscTests, 1421 RequestTests) 1422 test_support.run_unittest(*tests) 1423 1424if __name__ == "__main__": 1425 test_main(verbose=True) 1426