1import unittest 2from test import support 3from test import test_urllib 4 5import os 6import io 7import socket 8import array 9import sys 10import tempfile 11import subprocess 12 13import urllib.request 14# The proxy bypass method imported below has logic specific to the OSX 15# proxy config data structure but is testable on all platforms. 16from urllib.request import (Request, OpenerDirector, HTTPBasicAuthHandler, 17 HTTPPasswordMgrWithPriorAuth, _parse_proxy, 18 _proxy_bypass_macosx_sysconf, 19 AbstractDigestAuthHandler) 20from urllib.parse import urlparse 21import urllib.error 22import http.client 23 24# XXX 25# Request 26# CacheFTPHandler (hard to write) 27# parse_keqv_list, parse_http_list, HTTPDigestAuthHandler 28 29 30class TrivialTests(unittest.TestCase): 31 32 def test___all__(self): 33 # Verify which names are exposed 34 for module in 'request', 'response', 'parse', 'error', 'robotparser': 35 context = {} 36 exec('from urllib.%s import *' % module, context) 37 del context['__builtins__'] 38 if module == 'request' and os.name == 'nt': 39 u, p = context.pop('url2pathname'), context.pop('pathname2url') 40 self.assertEqual(u.__module__, 'nturl2path') 41 self.assertEqual(p.__module__, 'nturl2path') 42 for k, v in context.items(): 43 self.assertEqual(v.__module__, 'urllib.%s' % module, 44 "%r is exposed in 'urllib.%s' but defined in %r" % 45 (k, module, v.__module__)) 46 47 def test_trivial(self): 48 # A couple trivial tests 49 50 self.assertRaises(ValueError, urllib.request.urlopen, 'bogus url') 51 52 # XXX Name hacking to get this to work on Windows. 53 fname = os.path.abspath(urllib.request.__file__).replace(os.sep, '/') 54 55 if os.name == 'nt': 56 file_url = "file:///%s" % fname 57 else: 58 file_url = "file://%s" % fname 59 60 with urllib.request.urlopen(file_url) as f: 61 f.read() 62 63 def test_parse_http_list(self): 64 tests = [ 65 ('a,b,c', ['a', 'b', 'c']), 66 ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']), 67 ('a, b, "c", "d", "e,f", g, h', 68 ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']), 69 ('a="b\\"c", d="e\\,f", g="h\\\\i"', 70 ['a="b"c"', 'd="e,f"', 'g="h\\i"'])] 71 for string, list in tests: 72 self.assertEqual(urllib.request.parse_http_list(string), list) 73 74 def test_URLError_reasonstr(self): 75 err = urllib.error.URLError('reason') 76 self.assertIn(err.reason, str(err)) 77 78 79class RequestHdrsTests(unittest.TestCase): 80 81 def test_request_headers_dict(self): 82 """ 83 The Request.headers dictionary is not a documented interface. It 84 should stay that way, because the complete set of headers are only 85 accessible through the .get_header(), .has_header(), .header_items() 86 interface. However, .headers pre-dates those methods, and so real code 87 will be using the dictionary. 88 89 The introduction in 2.4 of those methods was a mistake for the same 90 reason: code that previously saw all (urllib2 user)-provided headers in 91 .headers now sees only a subset. 92 93 """ 94 url = "http://example.com" 95 self.assertEqual(Request(url, 96 headers={"Spam-eggs": "blah"} 97 ).headers["Spam-eggs"], "blah") 98 self.assertEqual(Request(url, 99 headers={"spam-EggS": "blah"} 100 ).headers["Spam-eggs"], "blah") 101 102 def test_request_headers_methods(self): 103 """ 104 Note the case normalization of header names here, to 105 .capitalize()-case. This should be preserved for 106 backwards-compatibility. (In the HTTP case, normalization to 107 .title()-case is done by urllib2 before sending headers to 108 http.client). 109 110 Note that e.g. r.has_header("spam-EggS") is currently False, and 111 r.get_header("spam-EggS") returns None, but that could be changed in 112 future. 113 114 Method r.remove_header should remove items both from r.headers and 115 r.unredirected_hdrs dictionaries 116 """ 117 url = "http://example.com" 118 req = Request(url, headers={"Spam-eggs": "blah"}) 119 self.assertTrue(req.has_header("Spam-eggs")) 120 self.assertEqual(req.header_items(), [('Spam-eggs', 'blah')]) 121 122 req.add_header("Foo-Bar", "baz") 123 self.assertEqual(sorted(req.header_items()), 124 [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')]) 125 self.assertFalse(req.has_header("Not-there")) 126 self.assertIsNone(req.get_header("Not-there")) 127 self.assertEqual(req.get_header("Not-there", "default"), "default") 128 129 req.remove_header("Spam-eggs") 130 self.assertFalse(req.has_header("Spam-eggs")) 131 132 req.add_unredirected_header("Unredirected-spam", "Eggs") 133 self.assertTrue(req.has_header("Unredirected-spam")) 134 135 req.remove_header("Unredirected-spam") 136 self.assertFalse(req.has_header("Unredirected-spam")) 137 138 def test_password_manager(self): 139 mgr = urllib.request.HTTPPasswordMgr() 140 add = mgr.add_password 141 find_user_pass = mgr.find_user_password 142 143 add("Some Realm", "http://example.com/", "joe", "password") 144 add("Some Realm", "http://example.com/ni", "ni", "ni") 145 add("Some Realm", "http://c.example.com:3128", "3", "c") 146 add("Some Realm", "d.example.com", "4", "d") 147 add("Some Realm", "e.example.com:3128", "5", "e") 148 149 # For the same realm, password set the highest path is the winner. 150 self.assertEqual(find_user_pass("Some Realm", "example.com"), 151 ('joe', 'password')) 152 self.assertEqual(find_user_pass("Some Realm", "http://example.com/ni"), 153 ('joe', 'password')) 154 self.assertEqual(find_user_pass("Some Realm", "http://example.com"), 155 ('joe', 'password')) 156 self.assertEqual(find_user_pass("Some Realm", "http://example.com/"), 157 ('joe', 'password')) 158 self.assertEqual(find_user_pass("Some Realm", 159 "http://example.com/spam"), 160 ('joe', 'password')) 161 162 self.assertEqual(find_user_pass("Some Realm", 163 "http://example.com/spam/spam"), 164 ('joe', 'password')) 165 166 # You can have different passwords for different paths. 167 168 add("c", "http://example.com/foo", "foo", "ni") 169 add("c", "http://example.com/bar", "bar", "nini") 170 171 self.assertEqual(find_user_pass("c", "http://example.com/foo"), 172 ('foo', 'ni')) 173 174 self.assertEqual(find_user_pass("c", "http://example.com/bar"), 175 ('bar', 'nini')) 176 177 # For the same path, newer password should be considered. 178 179 add("b", "http://example.com/", "first", "blah") 180 add("b", "http://example.com/", "second", "spam") 181 182 self.assertEqual(find_user_pass("b", "http://example.com/"), 183 ('second', 'spam')) 184 185 # No special relationship between a.example.com and example.com: 186 187 add("a", "http://example.com", "1", "a") 188 self.assertEqual(find_user_pass("a", "http://example.com/"), 189 ('1', 'a')) 190 191 self.assertEqual(find_user_pass("a", "http://a.example.com/"), 192 (None, None)) 193 194 # Ports: 195 196 self.assertEqual(find_user_pass("Some Realm", "c.example.com"), 197 (None, None)) 198 self.assertEqual(find_user_pass("Some Realm", "c.example.com:3128"), 199 ('3', 'c')) 200 self.assertEqual( 201 find_user_pass("Some Realm", "http://c.example.com:3128"), 202 ('3', 'c')) 203 self.assertEqual(find_user_pass("Some Realm", "d.example.com"), 204 ('4', 'd')) 205 self.assertEqual(find_user_pass("Some Realm", "e.example.com:3128"), 206 ('5', 'e')) 207 208 def test_password_manager_default_port(self): 209 """ 210 The point to note here is that we can't guess the default port if 211 there's no scheme. This applies to both add_password and 212 find_user_password. 213 """ 214 mgr = urllib.request.HTTPPasswordMgr() 215 add = mgr.add_password 216 find_user_pass = mgr.find_user_password 217 add("f", "http://g.example.com:80", "10", "j") 218 add("g", "http://h.example.com", "11", "k") 219 add("h", "i.example.com:80", "12", "l") 220 add("i", "j.example.com", "13", "m") 221 self.assertEqual(find_user_pass("f", "g.example.com:100"), 222 (None, None)) 223 self.assertEqual(find_user_pass("f", "g.example.com:80"), 224 ('10', 'j')) 225 self.assertEqual(find_user_pass("f", "g.example.com"), 226 (None, None)) 227 self.assertEqual(find_user_pass("f", "http://g.example.com:100"), 228 (None, None)) 229 self.assertEqual(find_user_pass("f", "http://g.example.com:80"), 230 ('10', 'j')) 231 self.assertEqual(find_user_pass("f", "http://g.example.com"), 232 ('10', 'j')) 233 self.assertEqual(find_user_pass("g", "h.example.com"), ('11', 'k')) 234 self.assertEqual(find_user_pass("g", "h.example.com:80"), ('11', 'k')) 235 self.assertEqual(find_user_pass("g", "http://h.example.com:80"), 236 ('11', 'k')) 237 self.assertEqual(find_user_pass("h", "i.example.com"), (None, None)) 238 self.assertEqual(find_user_pass("h", "i.example.com:80"), ('12', 'l')) 239 self.assertEqual(find_user_pass("h", "http://i.example.com:80"), 240 ('12', 'l')) 241 self.assertEqual(find_user_pass("i", "j.example.com"), ('13', 'm')) 242 self.assertEqual(find_user_pass("i", "j.example.com:80"), 243 (None, None)) 244 self.assertEqual(find_user_pass("i", "http://j.example.com"), 245 ('13', 'm')) 246 self.assertEqual(find_user_pass("i", "http://j.example.com:80"), 247 (None, None)) 248 249 250class MockOpener: 251 addheaders = [] 252 253 def open(self, req, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): 254 self.req, self.data, self.timeout = req, data, timeout 255 256 def error(self, proto, *args): 257 self.proto, self.args = proto, args 258 259 260class MockFile: 261 def read(self, count=None): 262 pass 263 264 def readline(self, count=None): 265 pass 266 267 def close(self): 268 pass 269 270 271class MockHeaders(dict): 272 def getheaders(self, name): 273 return list(self.values()) 274 275 276class MockResponse(io.StringIO): 277 def __init__(self, code, msg, headers, data, url=None): 278 io.StringIO.__init__(self, data) 279 self.code, self.msg, self.headers, self.url = code, msg, headers, url 280 281 def info(self): 282 return self.headers 283 284 def geturl(self): 285 return self.url 286 287 288class MockCookieJar: 289 def add_cookie_header(self, request): 290 self.ach_req = request 291 292 def extract_cookies(self, response, request): 293 self.ec_req, self.ec_r = request, response 294 295 296class FakeMethod: 297 def __init__(self, meth_name, action, handle): 298 self.meth_name = meth_name 299 self.handle = handle 300 self.action = action 301 302 def __call__(self, *args): 303 return self.handle(self.meth_name, self.action, *args) 304 305 306class MockHTTPResponse(io.IOBase): 307 def __init__(self, fp, msg, status, reason): 308 self.fp = fp 309 self.msg = msg 310 self.status = status 311 self.reason = reason 312 self.code = 200 313 314 def read(self): 315 return '' 316 317 def info(self): 318 return {} 319 320 def geturl(self): 321 return self.url 322 323 324class MockHTTPClass: 325 def __init__(self): 326 self.level = 0 327 self.req_headers = [] 328 self.data = None 329 self.raise_on_endheaders = False 330 self.sock = None 331 self._tunnel_headers = {} 332 333 def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): 334 self.host = host 335 self.timeout = timeout 336 return self 337 338 def set_debuglevel(self, level): 339 self.level = level 340 341 def set_tunnel(self, host, port=None, headers=None): 342 self._tunnel_host = host 343 self._tunnel_port = port 344 if headers: 345 self._tunnel_headers = headers 346 else: 347 self._tunnel_headers.clear() 348 349 def request(self, method, url, body=None, headers=None, *, 350 encode_chunked=False): 351 self.method = method 352 self.selector = url 353 if headers is not None: 354 self.req_headers += headers.items() 355 self.req_headers.sort() 356 if body: 357 self.data = body 358 self.encode_chunked = encode_chunked 359 if self.raise_on_endheaders: 360 raise OSError() 361 362 def getresponse(self): 363 return MockHTTPResponse(MockFile(), {}, 200, "OK") 364 365 def close(self): 366 pass 367 368 369class MockHandler: 370 # useful for testing handler machinery 371 # see add_ordered_mock_handlers() docstring 372 handler_order = 500 373 374 def __init__(self, methods): 375 self._define_methods(methods) 376 377 def _define_methods(self, methods): 378 for spec in methods: 379 if len(spec) == 2: 380 name, action = spec 381 else: 382 name, action = spec, None 383 meth = FakeMethod(name, action, self.handle) 384 setattr(self.__class__, name, meth) 385 386 def handle(self, fn_name, action, *args, **kwds): 387 self.parent.calls.append((self, fn_name, args, kwds)) 388 if action is None: 389 return None 390 elif action == "return self": 391 return self 392 elif action == "return response": 393 res = MockResponse(200, "OK", {}, "") 394 return res 395 elif action == "return request": 396 return Request("http://blah/") 397 elif action.startswith("error"): 398 code = action[action.rfind(" ")+1:] 399 try: 400 code = int(code) 401 except ValueError: 402 pass 403 res = MockResponse(200, "OK", {}, "") 404 return self.parent.error("http", args[0], res, code, "", {}) 405 elif action == "raise": 406 raise urllib.error.URLError("blah") 407 assert False 408 409 def close(self): 410 pass 411 412 def add_parent(self, parent): 413 self.parent = parent 414 self.parent.calls = [] 415 416 def __lt__(self, other): 417 if not hasattr(other, "handler_order"): 418 # No handler_order, leave in original order. Yuck. 419 return True 420 return self.handler_order < other.handler_order 421 422 423def add_ordered_mock_handlers(opener, meth_spec): 424 """Create MockHandlers and add them to an OpenerDirector. 425 426 meth_spec: list of lists of tuples and strings defining methods to define 427 on handlers. eg: 428 429 [["http_error", "ftp_open"], ["http_open"]] 430 431 defines methods .http_error() and .ftp_open() on one handler, and 432 .http_open() on another. These methods just record their arguments and 433 return None. Using a tuple instead of a string causes the method to 434 perform some action (see MockHandler.handle()), eg: 435 436 [["http_error"], [("http_open", "return request")]] 437 438 defines .http_error() on one handler (which simply returns None), and 439 .http_open() on another handler, which returns a Request object. 440 441 """ 442 handlers = [] 443 count = 0 444 for meths in meth_spec: 445 class MockHandlerSubclass(MockHandler): 446 pass 447 448 h = MockHandlerSubclass(meths) 449 h.handler_order += count 450 h.add_parent(opener) 451 count = count + 1 452 handlers.append(h) 453 opener.add_handler(h) 454 return handlers 455 456 457def build_test_opener(*handler_instances): 458 opener = OpenerDirector() 459 for h in handler_instances: 460 opener.add_handler(h) 461 return opener 462 463 464class MockHTTPHandler(urllib.request.BaseHandler): 465 # useful for testing redirections and auth 466 # sends supplied headers and code as first response 467 # sends 200 OK as second response 468 def __init__(self, code, headers): 469 self.code = code 470 self.headers = headers 471 self.reset() 472 473 def reset(self): 474 self._count = 0 475 self.requests = [] 476 477 def http_open(self, req): 478 import email, copy 479 self.requests.append(copy.deepcopy(req)) 480 if self._count == 0: 481 self._count = self._count + 1 482 name = http.client.responses[self.code] 483 msg = email.message_from_string(self.headers) 484 return self.parent.error( 485 "http", req, MockFile(), self.code, name, msg) 486 else: 487 self.req = req 488 msg = email.message_from_string("\r\n\r\n") 489 return MockResponse(200, "OK", msg, "", req.get_full_url()) 490 491 492class MockHTTPSHandler(urllib.request.AbstractHTTPHandler): 493 # Useful for testing the Proxy-Authorization request by verifying the 494 # properties of httpcon 495 496 def __init__(self, debuglevel=0): 497 urllib.request.AbstractHTTPHandler.__init__(self, debuglevel=debuglevel) 498 self.httpconn = MockHTTPClass() 499 500 def https_open(self, req): 501 return self.do_open(self.httpconn, req) 502 503 504class MockHTTPHandlerCheckAuth(urllib.request.BaseHandler): 505 # useful for testing auth 506 # sends supplied code response 507 # checks if auth header is specified in request 508 def __init__(self, code): 509 self.code = code 510 self.has_auth_header = False 511 512 def reset(self): 513 self.has_auth_header = False 514 515 def http_open(self, req): 516 if req.has_header('Authorization'): 517 self.has_auth_header = True 518 name = http.client.responses[self.code] 519 return MockResponse(self.code, name, MockFile(), "", req.get_full_url()) 520 521 522 523class MockPasswordManager: 524 def add_password(self, realm, uri, user, password): 525 self.realm = realm 526 self.url = uri 527 self.user = user 528 self.password = password 529 530 def find_user_password(self, realm, authuri): 531 self.target_realm = realm 532 self.target_url = authuri 533 return self.user, self.password 534 535 536class OpenerDirectorTests(unittest.TestCase): 537 538 def test_add_non_handler(self): 539 class NonHandler(object): 540 pass 541 self.assertRaises(TypeError, 542 OpenerDirector().add_handler, NonHandler()) 543 544 def test_badly_named_methods(self): 545 # test work-around for three methods that accidentally follow the 546 # naming conventions for handler methods 547 # (*_open() / *_request() / *_response()) 548 549 # These used to call the accidentally-named methods, causing a 550 # TypeError in real code; here, returning self from these mock 551 # methods would either cause no exception, or AttributeError. 552 553 from urllib.error import URLError 554 555 o = OpenerDirector() 556 meth_spec = [ 557 [("do_open", "return self"), ("proxy_open", "return self")], 558 [("redirect_request", "return self")], 559 ] 560 add_ordered_mock_handlers(o, meth_spec) 561 o.add_handler(urllib.request.UnknownHandler()) 562 for scheme in "do", "proxy", "redirect": 563 self.assertRaises(URLError, o.open, scheme+"://example.com/") 564 565 def test_handled(self): 566 # handler returning non-None means no more handlers will be called 567 o = OpenerDirector() 568 meth_spec = [ 569 ["http_open", "ftp_open", "http_error_302"], 570 ["ftp_open"], 571 [("http_open", "return self")], 572 [("http_open", "return self")], 573 ] 574 handlers = add_ordered_mock_handlers(o, meth_spec) 575 576 req = Request("http://example.com/") 577 r = o.open(req) 578 # Second .http_open() gets called, third doesn't, since second returned 579 # non-None. Handlers without .http_open() never get any methods called 580 # on them. 581 # In fact, second mock handler defining .http_open() returns self 582 # (instead of response), which becomes the OpenerDirector's return 583 # value. 584 self.assertEqual(r, handlers[2]) 585 calls = [(handlers[0], "http_open"), (handlers[2], "http_open")] 586 for expected, got in zip(calls, o.calls): 587 handler, name, args, kwds = got 588 self.assertEqual((handler, name), expected) 589 self.assertEqual(args, (req,)) 590 591 def test_handler_order(self): 592 o = OpenerDirector() 593 handlers = [] 594 for meths, handler_order in [([("http_open", "return self")], 500), 595 (["http_open"], 0)]: 596 class MockHandlerSubclass(MockHandler): 597 pass 598 599 h = MockHandlerSubclass(meths) 600 h.handler_order = handler_order 601 handlers.append(h) 602 o.add_handler(h) 603 604 o.open("http://example.com/") 605 # handlers called in reverse order, thanks to their sort order 606 self.assertEqual(o.calls[0][0], handlers[1]) 607 self.assertEqual(o.calls[1][0], handlers[0]) 608 609 def test_raise(self): 610 # raising URLError stops processing of request 611 o = OpenerDirector() 612 meth_spec = [ 613 [("http_open", "raise")], 614 [("http_open", "return self")], 615 ] 616 handlers = add_ordered_mock_handlers(o, meth_spec) 617 618 req = Request("http://example.com/") 619 self.assertRaises(urllib.error.URLError, o.open, req) 620 self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})]) 621 622 def test_http_error(self): 623 # XXX http_error_default 624 # http errors are a special case 625 o = OpenerDirector() 626 meth_spec = [ 627 [("http_open", "error 302")], 628 [("http_error_400", "raise"), "http_open"], 629 [("http_error_302", "return response"), "http_error_303", 630 "http_error"], 631 [("http_error_302")], 632 ] 633 handlers = add_ordered_mock_handlers(o, meth_spec) 634 635 class Unknown: 636 def __eq__(self, other): 637 return True 638 639 req = Request("http://example.com/") 640 o.open(req) 641 assert len(o.calls) == 2 642 calls = [(handlers[0], "http_open", (req,)), 643 (handlers[2], "http_error_302", 644 (req, Unknown(), 302, "", {}))] 645 for expected, got in zip(calls, o.calls): 646 handler, method_name, args = expected 647 self.assertEqual((handler, method_name), got[:2]) 648 self.assertEqual(args, got[2]) 649 650 def test_processors(self): 651 # *_request / *_response methods get called appropriately 652 o = OpenerDirector() 653 meth_spec = [ 654 [("http_request", "return request"), 655 ("http_response", "return response")], 656 [("http_request", "return request"), 657 ("http_response", "return response")], 658 ] 659 handlers = add_ordered_mock_handlers(o, meth_spec) 660 661 req = Request("http://example.com/") 662 o.open(req) 663 # processor methods are called on *all* handlers that define them, 664 # not just the first handler that handles the request 665 calls = [ 666 (handlers[0], "http_request"), (handlers[1], "http_request"), 667 (handlers[0], "http_response"), (handlers[1], "http_response")] 668 669 for i, (handler, name, args, kwds) in enumerate(o.calls): 670 if i < 2: 671 # *_request 672 self.assertEqual((handler, name), calls[i]) 673 self.assertEqual(len(args), 1) 674 self.assertIsInstance(args[0], Request) 675 else: 676 # *_response 677 self.assertEqual((handler, name), calls[i]) 678 self.assertEqual(len(args), 2) 679 self.assertIsInstance(args[0], Request) 680 # response from opener.open is None, because there's no 681 # handler that defines http_open to handle it 682 if args[1] is not None: 683 self.assertIsInstance(args[1], MockResponse) 684 685 686def sanepathname2url(path): 687 try: 688 path.encode("utf-8") 689 except UnicodeEncodeError: 690 raise unittest.SkipTest("path is not encodable to utf8") 691 urlpath = urllib.request.pathname2url(path) 692 if os.name == "nt" and urlpath.startswith("///"): 693 urlpath = urlpath[2:] 694 # XXX don't ask me about the mac... 695 return urlpath 696 697 698class HandlerTests(unittest.TestCase): 699 700 def test_ftp(self): 701 class MockFTPWrapper: 702 def __init__(self, data): 703 self.data = data 704 705 def retrfile(self, filename, filetype): 706 self.filename, self.filetype = filename, filetype 707 return io.StringIO(self.data), len(self.data) 708 709 def close(self): 710 pass 711 712 class NullFTPHandler(urllib.request.FTPHandler): 713 def __init__(self, data): 714 self.data = data 715 716 def connect_ftp(self, user, passwd, host, port, dirs, 717 timeout=socket._GLOBAL_DEFAULT_TIMEOUT): 718 self.user, self.passwd = user, passwd 719 self.host, self.port = host, port 720 self.dirs = dirs 721 self.ftpwrapper = MockFTPWrapper(self.data) 722 return self.ftpwrapper 723 724 import ftplib 725 data = "rheum rhaponicum" 726 h = NullFTPHandler(data) 727 h.parent = MockOpener() 728 729 for url, host, port, user, passwd, type_, dirs, filename, mimetype in [ 730 ("ftp://localhost/foo/bar/baz.html", 731 "localhost", ftplib.FTP_PORT, "", "", "I", 732 ["foo", "bar"], "baz.html", "text/html"), 733 ("ftp://parrot@localhost/foo/bar/baz.html", 734 "localhost", ftplib.FTP_PORT, "parrot", "", "I", 735 ["foo", "bar"], "baz.html", "text/html"), 736 ("ftp://%25parrot@localhost/foo/bar/baz.html", 737 "localhost", ftplib.FTP_PORT, "%parrot", "", "I", 738 ["foo", "bar"], "baz.html", "text/html"), 739 ("ftp://%2542parrot@localhost/foo/bar/baz.html", 740 "localhost", ftplib.FTP_PORT, "%42parrot", "", "I", 741 ["foo", "bar"], "baz.html", "text/html"), 742 ("ftp://localhost:80/foo/bar/", 743 "localhost", 80, "", "", "D", 744 ["foo", "bar"], "", None), 745 ("ftp://localhost/baz.gif;type=a", 746 "localhost", ftplib.FTP_PORT, "", "", "A", 747 [], "baz.gif", None), # XXX really this should guess image/gif 748 ]: 749 req = Request(url) 750 req.timeout = None 751 r = h.ftp_open(req) 752 # ftp authentication not yet implemented by FTPHandler 753 self.assertEqual(h.user, user) 754 self.assertEqual(h.passwd, passwd) 755 self.assertEqual(h.host, socket.gethostbyname(host)) 756 self.assertEqual(h.port, port) 757 self.assertEqual(h.dirs, dirs) 758 self.assertEqual(h.ftpwrapper.filename, filename) 759 self.assertEqual(h.ftpwrapper.filetype, type_) 760 headers = r.info() 761 self.assertEqual(headers.get("Content-type"), mimetype) 762 self.assertEqual(int(headers["Content-length"]), len(data)) 763 764 def test_file(self): 765 import email.utils 766 h = urllib.request.FileHandler() 767 o = h.parent = MockOpener() 768 769 TESTFN = support.TESTFN 770 urlpath = sanepathname2url(os.path.abspath(TESTFN)) 771 towrite = b"hello, world\n" 772 urls = [ 773 "file://localhost%s" % urlpath, 774 "file://%s" % urlpath, 775 "file://%s%s" % (socket.gethostbyname('localhost'), urlpath), 776 ] 777 try: 778 localaddr = socket.gethostbyname(socket.gethostname()) 779 except socket.gaierror: 780 localaddr = '' 781 if localaddr: 782 urls.append("file://%s%s" % (localaddr, urlpath)) 783 784 for url in urls: 785 f = open(TESTFN, "wb") 786 try: 787 try: 788 f.write(towrite) 789 finally: 790 f.close() 791 792 r = h.file_open(Request(url)) 793 try: 794 data = r.read() 795 headers = r.info() 796 respurl = r.geturl() 797 finally: 798 r.close() 799 stats = os.stat(TESTFN) 800 modified = email.utils.formatdate(stats.st_mtime, usegmt=True) 801 finally: 802 os.remove(TESTFN) 803 self.assertEqual(data, towrite) 804 self.assertEqual(headers["Content-type"], "text/plain") 805 self.assertEqual(headers["Content-length"], "13") 806 self.assertEqual(headers["Last-modified"], modified) 807 self.assertEqual(respurl, url) 808 809 for url in [ 810 "file://localhost:80%s" % urlpath, 811 "file:///file_does_not_exist.txt", 812 "file://not-a-local-host.com//dir/file.txt", 813 "file://%s:80%s/%s" % (socket.gethostbyname('localhost'), 814 os.getcwd(), TESTFN), 815 "file://somerandomhost.ontheinternet.com%s/%s" % 816 (os.getcwd(), TESTFN), 817 ]: 818 try: 819 f = open(TESTFN, "wb") 820 try: 821 f.write(towrite) 822 finally: 823 f.close() 824 825 self.assertRaises(urllib.error.URLError, 826 h.file_open, Request(url)) 827 finally: 828 os.remove(TESTFN) 829 830 h = urllib.request.FileHandler() 831 o = h.parent = MockOpener() 832 # XXXX why does // mean ftp (and /// mean not ftp!), and where 833 # is file: scheme specified? I think this is really a bug, and 834 # what was intended was to distinguish between URLs like: 835 # file:/blah.txt (a file) 836 # file://localhost/blah.txt (a file) 837 # file:///blah.txt (a file) 838 # file://ftp.example.com/blah.txt (an ftp URL) 839 for url, ftp in [ 840 ("file://ftp.example.com//foo.txt", False), 841 ("file://ftp.example.com///foo.txt", False), 842 ("file://ftp.example.com/foo.txt", False), 843 ("file://somehost//foo/something.txt", False), 844 ("file://localhost//foo/something.txt", False), 845 ]: 846 req = Request(url) 847 try: 848 h.file_open(req) 849 except urllib.error.URLError: 850 self.assertFalse(ftp) 851 else: 852 self.assertIs(o.req, req) 853 self.assertEqual(req.type, "ftp") 854 self.assertEqual(req.type == "ftp", ftp) 855 856 def test_http(self): 857 858 h = urllib.request.AbstractHTTPHandler() 859 o = h.parent = MockOpener() 860 861 url = "http://example.com/" 862 for method, data in [("GET", None), ("POST", b"blah")]: 863 req = Request(url, data, {"Foo": "bar"}) 864 req.timeout = None 865 req.add_unredirected_header("Spam", "eggs") 866 http = MockHTTPClass() 867 r = h.do_open(http, req) 868 869 # result attributes 870 r.read; r.readline # wrapped MockFile methods 871 r.info; r.geturl # addinfourl methods 872 r.code, r.msg == 200, "OK" # added from MockHTTPClass.getreply() 873 hdrs = r.info() 874 hdrs.get; hdrs.__contains__ # r.info() gives dict from .getreply() 875 self.assertEqual(r.geturl(), url) 876 877 self.assertEqual(http.host, "example.com") 878 self.assertEqual(http.level, 0) 879 self.assertEqual(http.method, method) 880 self.assertEqual(http.selector, "/") 881 self.assertEqual(http.req_headers, 882 [("Connection", "close"), 883 ("Foo", "bar"), ("Spam", "eggs")]) 884 self.assertEqual(http.data, data) 885 886 # check OSError converted to URLError 887 http.raise_on_endheaders = True 888 self.assertRaises(urllib.error.URLError, h.do_open, http, req) 889 890 # Check for TypeError on POST data which is str. 891 req = Request("http://example.com/","badpost") 892 self.assertRaises(TypeError, h.do_request_, req) 893 894 # check adding of standard headers 895 o.addheaders = [("Spam", "eggs")] 896 for data in b"", None: # POST, GET 897 req = Request("http://example.com/", data) 898 r = MockResponse(200, "OK", {}, "") 899 newreq = h.do_request_(req) 900 if data is None: # GET 901 self.assertNotIn("Content-length", req.unredirected_hdrs) 902 self.assertNotIn("Content-type", req.unredirected_hdrs) 903 else: # POST 904 self.assertEqual(req.unredirected_hdrs["Content-length"], "0") 905 self.assertEqual(req.unredirected_hdrs["Content-type"], 906 "application/x-www-form-urlencoded") 907 # XXX the details of Host could be better tested 908 self.assertEqual(req.unredirected_hdrs["Host"], "example.com") 909 self.assertEqual(req.unredirected_hdrs["Spam"], "eggs") 910 911 # don't clobber existing headers 912 req.add_unredirected_header("Content-length", "foo") 913 req.add_unredirected_header("Content-type", "bar") 914 req.add_unredirected_header("Host", "baz") 915 req.add_unredirected_header("Spam", "foo") 916 newreq = h.do_request_(req) 917 self.assertEqual(req.unredirected_hdrs["Content-length"], "foo") 918 self.assertEqual(req.unredirected_hdrs["Content-type"], "bar") 919 self.assertEqual(req.unredirected_hdrs["Host"], "baz") 920 self.assertEqual(req.unredirected_hdrs["Spam"], "foo") 921 922 def test_http_body_file(self): 923 # A regular file - chunked encoding is used unless Content Length is 924 # already set. 925 926 h = urllib.request.AbstractHTTPHandler() 927 o = h.parent = MockOpener() 928 929 file_obj = tempfile.NamedTemporaryFile(mode='w+b', delete=False) 930 file_path = file_obj.name 931 file_obj.close() 932 self.addCleanup(os.unlink, file_path) 933 934 with open(file_path, "rb") as f: 935 req = Request("http://example.com/", f, {}) 936 newreq = h.do_request_(req) 937 te = newreq.get_header('Transfer-encoding') 938 self.assertEqual(te, "chunked") 939 self.assertFalse(newreq.has_header('Content-length')) 940 941 with open(file_path, "rb") as f: 942 req = Request("http://example.com/", f, {"Content-Length": 30}) 943 newreq = h.do_request_(req) 944 self.assertEqual(int(newreq.get_header('Content-length')), 30) 945 self.assertFalse(newreq.has_header("Transfer-encoding")) 946 947 def test_http_body_fileobj(self): 948 # A file object - chunked encoding is used 949 # unless Content Length is already set. 950 # (Note that there are some subtle differences to a regular 951 # file, that is why we are testing both cases.) 952 953 h = urllib.request.AbstractHTTPHandler() 954 o = h.parent = MockOpener() 955 file_obj = io.BytesIO() 956 957 req = Request("http://example.com/", file_obj, {}) 958 newreq = h.do_request_(req) 959 self.assertEqual(newreq.get_header('Transfer-encoding'), 'chunked') 960 self.assertFalse(newreq.has_header('Content-length')) 961 962 headers = {"Content-Length": 30} 963 req = Request("http://example.com/", file_obj, headers) 964 newreq = h.do_request_(req) 965 self.assertEqual(int(newreq.get_header('Content-length')), 30) 966 self.assertFalse(newreq.has_header("Transfer-encoding")) 967 968 file_obj.close() 969 970 def test_http_body_pipe(self): 971 # A file reading from a pipe. 972 # A pipe cannot be seek'ed. There is no way to determine the 973 # content length up front. Thus, do_request_() should fall 974 # back to Transfer-encoding chunked. 975 976 h = urllib.request.AbstractHTTPHandler() 977 o = h.parent = MockOpener() 978 979 cmd = [sys.executable, "-c", r"pass"] 980 for headers in {}, {"Content-Length": 30}: 981 with subprocess.Popen(cmd, stdout=subprocess.PIPE) as proc: 982 req = Request("http://example.com/", proc.stdout, headers) 983 newreq = h.do_request_(req) 984 if not headers: 985 self.assertEqual(newreq.get_header('Content-length'), None) 986 self.assertEqual(newreq.get_header('Transfer-encoding'), 987 'chunked') 988 else: 989 self.assertEqual(int(newreq.get_header('Content-length')), 990 30) 991 992 def test_http_body_iterable(self): 993 # Generic iterable. There is no way to determine the content 994 # length up front. Fall back to Transfer-encoding chunked. 995 996 h = urllib.request.AbstractHTTPHandler() 997 o = h.parent = MockOpener() 998 999 def iterable_body(): 1000 yield b"one" 1001 1002 for headers in {}, {"Content-Length": 11}: 1003 req = Request("http://example.com/", iterable_body(), headers) 1004 newreq = h.do_request_(req) 1005 if not headers: 1006 self.assertEqual(newreq.get_header('Content-length'), None) 1007 self.assertEqual(newreq.get_header('Transfer-encoding'), 1008 'chunked') 1009 else: 1010 self.assertEqual(int(newreq.get_header('Content-length')), 11) 1011 1012 def test_http_body_empty_seq(self): 1013 # Zero-length iterable body should be treated like any other iterable 1014 h = urllib.request.AbstractHTTPHandler() 1015 h.parent = MockOpener() 1016 req = h.do_request_(Request("http://example.com/", ())) 1017 self.assertEqual(req.get_header("Transfer-encoding"), "chunked") 1018 self.assertFalse(req.has_header("Content-length")) 1019 1020 def test_http_body_array(self): 1021 # array.array Iterable - Content Length is calculated 1022 1023 h = urllib.request.AbstractHTTPHandler() 1024 o = h.parent = MockOpener() 1025 1026 iterable_array = array.array("I",[1,2,3,4]) 1027 1028 for headers in {}, {"Content-Length": 16}: 1029 req = Request("http://example.com/", iterable_array, headers) 1030 newreq = h.do_request_(req) 1031 self.assertEqual(int(newreq.get_header('Content-length')),16) 1032 1033 def test_http_handler_debuglevel(self): 1034 o = OpenerDirector() 1035 h = MockHTTPSHandler(debuglevel=1) 1036 o.add_handler(h) 1037 o.open("https://www.example.com") 1038 self.assertEqual(h._debuglevel, 1) 1039 1040 def test_http_doubleslash(self): 1041 # Checks the presence of any unnecessary double slash in url does not 1042 # break anything. Previously, a double slash directly after the host 1043 # could cause incorrect parsing. 1044 h = urllib.request.AbstractHTTPHandler() 1045 h.parent = MockOpener() 1046 1047 data = b"" 1048 ds_urls = [ 1049 "http://example.com/foo/bar/baz.html", 1050 "http://example.com//foo/bar/baz.html", 1051 "http://example.com/foo//bar/baz.html", 1052 "http://example.com/foo/bar//baz.html" 1053 ] 1054 1055 for ds_url in ds_urls: 1056 ds_req = Request(ds_url, data) 1057 1058 # Check whether host is determined correctly if there is no proxy 1059 np_ds_req = h.do_request_(ds_req) 1060 self.assertEqual(np_ds_req.unredirected_hdrs["Host"], "example.com") 1061 1062 # Check whether host is determined correctly if there is a proxy 1063 ds_req.set_proxy("someproxy:3128", None) 1064 p_ds_req = h.do_request_(ds_req) 1065 self.assertEqual(p_ds_req.unredirected_hdrs["Host"], "example.com") 1066 1067 def test_full_url_setter(self): 1068 # Checks to ensure that components are set correctly after setting the 1069 # full_url of a Request object 1070 1071 urls = [ 1072 'http://example.com?foo=bar#baz', 1073 'http://example.com?foo=bar&spam=eggs#bash', 1074 'http://example.com', 1075 ] 1076 1077 # testing a reusable request instance, but the url parameter is 1078 # required, so just use a dummy one to instantiate 1079 r = Request('http://example.com') 1080 for url in urls: 1081 r.full_url = url 1082 parsed = urlparse(url) 1083 1084 self.assertEqual(r.get_full_url(), url) 1085 # full_url setter uses splittag to split into components. 1086 # splittag sets the fragment as None while urlparse sets it to '' 1087 self.assertEqual(r.fragment or '', parsed.fragment) 1088 self.assertEqual(urlparse(r.get_full_url()).query, parsed.query) 1089 1090 def test_full_url_deleter(self): 1091 r = Request('http://www.example.com') 1092 del r.full_url 1093 self.assertIsNone(r.full_url) 1094 self.assertIsNone(r.fragment) 1095 self.assertEqual(r.selector, '') 1096 1097 def test_fixpath_in_weirdurls(self): 1098 # Issue4493: urllib2 to supply '/' when to urls where path does not 1099 # start with'/' 1100 1101 h = urllib.request.AbstractHTTPHandler() 1102 h.parent = MockOpener() 1103 1104 weird_url = 'http://www.python.org?getspam' 1105 req = Request(weird_url) 1106 newreq = h.do_request_(req) 1107 self.assertEqual(newreq.host, 'www.python.org') 1108 self.assertEqual(newreq.selector, '/?getspam') 1109 1110 url_without_path = 'http://www.python.org' 1111 req = Request(url_without_path) 1112 newreq = h.do_request_(req) 1113 self.assertEqual(newreq.host, 'www.python.org') 1114 self.assertEqual(newreq.selector, '') 1115 1116 def test_errors(self): 1117 h = urllib.request.HTTPErrorProcessor() 1118 o = h.parent = MockOpener() 1119 1120 url = "http://example.com/" 1121 req = Request(url) 1122 # all 2xx are passed through 1123 r = MockResponse(200, "OK", {}, "", url) 1124 newr = h.http_response(req, r) 1125 self.assertIs(r, newr) 1126 self.assertFalse(hasattr(o, "proto")) # o.error not called 1127 r = MockResponse(202, "Accepted", {}, "", url) 1128 newr = h.http_response(req, r) 1129 self.assertIs(r, newr) 1130 self.assertFalse(hasattr(o, "proto")) # o.error not called 1131 r = MockResponse(206, "Partial content", {}, "", url) 1132 newr = h.http_response(req, r) 1133 self.assertIs(r, newr) 1134 self.assertFalse(hasattr(o, "proto")) # o.error not called 1135 # anything else calls o.error (and MockOpener returns None, here) 1136 r = MockResponse(502, "Bad gateway", {}, "", url) 1137 self.assertIsNone(h.http_response(req, r)) 1138 self.assertEqual(o.proto, "http") # o.error called 1139 self.assertEqual(o.args, (req, r, 502, "Bad gateway", {})) 1140 1141 def test_cookies(self): 1142 cj = MockCookieJar() 1143 h = urllib.request.HTTPCookieProcessor(cj) 1144 h.parent = MockOpener() 1145 1146 req = Request("http://example.com/") 1147 r = MockResponse(200, "OK", {}, "") 1148 newreq = h.http_request(req) 1149 self.assertIs(cj.ach_req, req) 1150 self.assertIs(cj.ach_req, newreq) 1151 self.assertEqual(req.origin_req_host, "example.com") 1152 self.assertFalse(req.unverifiable) 1153 newr = h.http_response(req, r) 1154 self.assertIs(cj.ec_req, req) 1155 self.assertIs(cj.ec_r, r) 1156 self.assertIs(r, newr) 1157 1158 def test_redirect(self): 1159 from_url = "http://example.com/a.html" 1160 to_url = "http://example.com/b.html" 1161 h = urllib.request.HTTPRedirectHandler() 1162 o = h.parent = MockOpener() 1163 1164 # ordinary redirect behaviour 1165 for code in 301, 302, 303, 307: 1166 for data in None, "blah\nblah\n": 1167 method = getattr(h, "http_error_%s" % code) 1168 req = Request(from_url, data) 1169 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 1170 req.add_header("Nonsense", "viking=withhold") 1171 if data is not None: 1172 req.add_header("Content-Length", str(len(data))) 1173 req.add_unredirected_header("Spam", "spam") 1174 try: 1175 method(req, MockFile(), code, "Blah", 1176 MockHeaders({"location": to_url})) 1177 except urllib.error.HTTPError: 1178 # 307 in response to POST requires user OK 1179 self.assertEqual(code, 307) 1180 self.assertIsNotNone(data) 1181 self.assertEqual(o.req.get_full_url(), to_url) 1182 try: 1183 self.assertEqual(o.req.get_method(), "GET") 1184 except AttributeError: 1185 self.assertFalse(o.req.data) 1186 1187 # now it's a GET, there should not be headers regarding content 1188 # (possibly dragged from before being a POST) 1189 headers = [x.lower() for x in o.req.headers] 1190 self.assertNotIn("content-length", headers) 1191 self.assertNotIn("content-type", headers) 1192 1193 self.assertEqual(o.req.headers["Nonsense"], 1194 "viking=withhold") 1195 self.assertNotIn("Spam", o.req.headers) 1196 self.assertNotIn("Spam", o.req.unredirected_hdrs) 1197 1198 # loop detection 1199 req = Request(from_url) 1200 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 1201 1202 def redirect(h, req, url=to_url): 1203 h.http_error_302(req, MockFile(), 302, "Blah", 1204 MockHeaders({"location": url})) 1205 # Note that the *original* request shares the same record of 1206 # redirections with the sub-requests caused by the redirections. 1207 1208 # detect infinite loop redirect of a URL to itself 1209 req = Request(from_url, origin_req_host="example.com") 1210 count = 0 1211 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 1212 try: 1213 while 1: 1214 redirect(h, req, "http://example.com/") 1215 count = count + 1 1216 except urllib.error.HTTPError: 1217 # don't stop until max_repeats, because cookies may introduce state 1218 self.assertEqual(count, urllib.request.HTTPRedirectHandler.max_repeats) 1219 1220 # detect endless non-repeating chain of redirects 1221 req = Request(from_url, origin_req_host="example.com") 1222 count = 0 1223 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 1224 try: 1225 while 1: 1226 redirect(h, req, "http://example.com/%d" % count) 1227 count = count + 1 1228 except urllib.error.HTTPError: 1229 self.assertEqual(count, 1230 urllib.request.HTTPRedirectHandler.max_redirections) 1231 1232 def test_invalid_redirect(self): 1233 from_url = "http://example.com/a.html" 1234 valid_schemes = ['http','https','ftp'] 1235 invalid_schemes = ['file','imap','ldap'] 1236 schemeless_url = "example.com/b.html" 1237 h = urllib.request.HTTPRedirectHandler() 1238 o = h.parent = MockOpener() 1239 req = Request(from_url) 1240 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 1241 1242 for scheme in invalid_schemes: 1243 invalid_url = scheme + '://' + schemeless_url 1244 self.assertRaises(urllib.error.HTTPError, h.http_error_302, 1245 req, MockFile(), 302, "Security Loophole", 1246 MockHeaders({"location": invalid_url})) 1247 1248 for scheme in valid_schemes: 1249 valid_url = scheme + '://' + schemeless_url 1250 h.http_error_302(req, MockFile(), 302, "That's fine", 1251 MockHeaders({"location": valid_url})) 1252 self.assertEqual(o.req.get_full_url(), valid_url) 1253 1254 def test_relative_redirect(self): 1255 from_url = "http://example.com/a.html" 1256 relative_url = "/b.html" 1257 h = urllib.request.HTTPRedirectHandler() 1258 o = h.parent = MockOpener() 1259 req = Request(from_url) 1260 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 1261 1262 valid_url = urllib.parse.urljoin(from_url,relative_url) 1263 h.http_error_302(req, MockFile(), 302, "That's fine", 1264 MockHeaders({"location": valid_url})) 1265 self.assertEqual(o.req.get_full_url(), valid_url) 1266 1267 def test_cookie_redirect(self): 1268 # cookies shouldn't leak into redirected requests 1269 from http.cookiejar import CookieJar 1270 from test.test_http_cookiejar import interact_netscape 1271 1272 cj = CookieJar() 1273 interact_netscape(cj, "http://www.example.com/", "spam=eggs") 1274 hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n") 1275 hdeh = urllib.request.HTTPDefaultErrorHandler() 1276 hrh = urllib.request.HTTPRedirectHandler() 1277 cp = urllib.request.HTTPCookieProcessor(cj) 1278 o = build_test_opener(hh, hdeh, hrh, cp) 1279 o.open("http://www.example.com/") 1280 self.assertFalse(hh.req.has_header("Cookie")) 1281 1282 def test_redirect_fragment(self): 1283 redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n' 1284 hh = MockHTTPHandler(302, 'Location: ' + redirected_url) 1285 hdeh = urllib.request.HTTPDefaultErrorHandler() 1286 hrh = urllib.request.HTTPRedirectHandler() 1287 o = build_test_opener(hh, hdeh, hrh) 1288 fp = o.open('http://www.example.com') 1289 self.assertEqual(fp.geturl(), redirected_url.strip()) 1290 1291 def test_redirect_no_path(self): 1292 # Issue 14132: Relative redirect strips original path 1293 real_class = http.client.HTTPConnection 1294 response1 = b"HTTP/1.1 302 Found\r\nLocation: ?query\r\n\r\n" 1295 http.client.HTTPConnection = test_urllib.fakehttp(response1) 1296 self.addCleanup(setattr, http.client, "HTTPConnection", real_class) 1297 urls = iter(("/path", "/path?query")) 1298 def request(conn, method, url, *pos, **kw): 1299 self.assertEqual(url, next(urls)) 1300 real_class.request(conn, method, url, *pos, **kw) 1301 # Change response for subsequent connection 1302 conn.__class__.fakedata = b"HTTP/1.1 200 OK\r\n\r\nHello!" 1303 http.client.HTTPConnection.request = request 1304 fp = urllib.request.urlopen("http://python.org/path") 1305 self.assertEqual(fp.geturl(), "http://python.org/path?query") 1306 1307 def test_redirect_encoding(self): 1308 # Some characters in the redirect target may need special handling, 1309 # but most ASCII characters should be treated as already encoded 1310 class Handler(urllib.request.HTTPHandler): 1311 def http_open(self, req): 1312 result = self.do_open(self.connection, req) 1313 self.last_buf = self.connection.buf 1314 # Set up a normal response for the next request 1315 self.connection = test_urllib.fakehttp( 1316 b'HTTP/1.1 200 OK\r\n' 1317 b'Content-Length: 3\r\n' 1318 b'\r\n' 1319 b'123' 1320 ) 1321 return result 1322 handler = Handler() 1323 opener = urllib.request.build_opener(handler) 1324 tests = ( 1325 (b'/p\xC3\xA5-dansk/', b'/p%C3%A5-dansk/'), 1326 (b'/spaced%20path/', b'/spaced%20path/'), 1327 (b'/spaced path/', b'/spaced%20path/'), 1328 (b'/?p\xC3\xA5-dansk', b'/?p%C3%A5-dansk'), 1329 ) 1330 for [location, result] in tests: 1331 with self.subTest(repr(location)): 1332 handler.connection = test_urllib.fakehttp( 1333 b'HTTP/1.1 302 Redirect\r\n' 1334 b'Location: ' + location + b'\r\n' 1335 b'\r\n' 1336 ) 1337 response = opener.open('http://example.com/') 1338 expected = b'GET ' + result + b' ' 1339 request = handler.last_buf 1340 self.assertTrue(request.startswith(expected), repr(request)) 1341 1342 def test_proxy(self): 1343 u = "proxy.example.com:3128" 1344 for d in dict(http=u), dict(HTTP=u): 1345 o = OpenerDirector() 1346 ph = urllib.request.ProxyHandler(d) 1347 o.add_handler(ph) 1348 meth_spec = [ 1349 [("http_open", "return response")] 1350 ] 1351 handlers = add_ordered_mock_handlers(o, meth_spec) 1352 1353 req = Request("http://acme.example.com/") 1354 self.assertEqual(req.host, "acme.example.com") 1355 o.open(req) 1356 self.assertEqual(req.host, u) 1357 self.assertEqual([(handlers[0], "http_open")], 1358 [tup[0:2] for tup in o.calls]) 1359 1360 def test_proxy_no_proxy(self): 1361 os.environ['no_proxy'] = 'python.org' 1362 o = OpenerDirector() 1363 ph = urllib.request.ProxyHandler(dict(http="proxy.example.com")) 1364 o.add_handler(ph) 1365 req = Request("http://www.perl.org/") 1366 self.assertEqual(req.host, "www.perl.org") 1367 o.open(req) 1368 self.assertEqual(req.host, "proxy.example.com") 1369 req = Request("http://www.python.org") 1370 self.assertEqual(req.host, "www.python.org") 1371 o.open(req) 1372 self.assertEqual(req.host, "www.python.org") 1373 del os.environ['no_proxy'] 1374 1375 def test_proxy_no_proxy_all(self): 1376 os.environ['no_proxy'] = '*' 1377 o = OpenerDirector() 1378 ph = urllib.request.ProxyHandler(dict(http="proxy.example.com")) 1379 o.add_handler(ph) 1380 req = Request("http://www.python.org") 1381 self.assertEqual(req.host, "www.python.org") 1382 o.open(req) 1383 self.assertEqual(req.host, "www.python.org") 1384 del os.environ['no_proxy'] 1385 1386 def test_proxy_https(self): 1387 o = OpenerDirector() 1388 ph = urllib.request.ProxyHandler(dict(https="proxy.example.com:3128")) 1389 o.add_handler(ph) 1390 meth_spec = [ 1391 [("https_open", "return response")] 1392 ] 1393 handlers = add_ordered_mock_handlers(o, meth_spec) 1394 1395 req = Request("https://www.example.com/") 1396 self.assertEqual(req.host, "www.example.com") 1397 o.open(req) 1398 self.assertEqual(req.host, "proxy.example.com:3128") 1399 self.assertEqual([(handlers[0], "https_open")], 1400 [tup[0:2] for tup in o.calls]) 1401 1402 def test_proxy_https_proxy_authorization(self): 1403 o = OpenerDirector() 1404 ph = urllib.request.ProxyHandler(dict(https='proxy.example.com:3128')) 1405 o.add_handler(ph) 1406 https_handler = MockHTTPSHandler() 1407 o.add_handler(https_handler) 1408 req = Request("https://www.example.com/") 1409 req.add_header("Proxy-Authorization", "FooBar") 1410 req.add_header("User-Agent", "Grail") 1411 self.assertEqual(req.host, "www.example.com") 1412 self.assertIsNone(req._tunnel_host) 1413 o.open(req) 1414 # Verify Proxy-Authorization gets tunneled to request. 1415 # httpsconn req_headers do not have the Proxy-Authorization header but 1416 # the req will have. 1417 self.assertNotIn(("Proxy-Authorization", "FooBar"), 1418 https_handler.httpconn.req_headers) 1419 self.assertIn(("User-Agent", "Grail"), 1420 https_handler.httpconn.req_headers) 1421 self.assertIsNotNone(req._tunnel_host) 1422 self.assertEqual(req.host, "proxy.example.com:3128") 1423 self.assertEqual(req.get_header("Proxy-authorization"), "FooBar") 1424 1425 @unittest.skipUnless(sys.platform == 'darwin', "only relevant for OSX") 1426 def test_osx_proxy_bypass(self): 1427 bypass = { 1428 'exclude_simple': False, 1429 'exceptions': ['foo.bar', '*.bar.com', '127.0.0.1', '10.10', 1430 '10.0/16'] 1431 } 1432 # Check hosts that should trigger the proxy bypass 1433 for host in ('foo.bar', 'www.bar.com', '127.0.0.1', '10.10.0.1', 1434 '10.0.0.1'): 1435 self.assertTrue(_proxy_bypass_macosx_sysconf(host, bypass), 1436 'expected bypass of %s to be True' % host) 1437 # Check hosts that should not trigger the proxy bypass 1438 for host in ('abc.foo.bar', 'bar.com', '127.0.0.2', '10.11.0.1', 1439 'notinbypass'): 1440 self.assertFalse(_proxy_bypass_macosx_sysconf(host, bypass), 1441 'expected bypass of %s to be False' % host) 1442 1443 # Check the exclude_simple flag 1444 bypass = {'exclude_simple': True, 'exceptions': []} 1445 self.assertTrue(_proxy_bypass_macosx_sysconf('test', bypass)) 1446 1447 def test_basic_auth(self, quote_char='"'): 1448 opener = OpenerDirector() 1449 password_manager = MockPasswordManager() 1450 auth_handler = urllib.request.HTTPBasicAuthHandler(password_manager) 1451 realm = "ACME Widget Store" 1452 http_handler = MockHTTPHandler( 1453 401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' % 1454 (quote_char, realm, quote_char)) 1455 opener.add_handler(auth_handler) 1456 opener.add_handler(http_handler) 1457 self._test_basic_auth(opener, auth_handler, "Authorization", 1458 realm, http_handler, password_manager, 1459 "http://acme.example.com/protected", 1460 "http://acme.example.com/protected", 1461 ) 1462 1463 def test_basic_auth_with_single_quoted_realm(self): 1464 self.test_basic_auth(quote_char="'") 1465 1466 def test_basic_auth_with_unquoted_realm(self): 1467 opener = OpenerDirector() 1468 password_manager = MockPasswordManager() 1469 auth_handler = urllib.request.HTTPBasicAuthHandler(password_manager) 1470 realm = "ACME Widget Store" 1471 http_handler = MockHTTPHandler( 1472 401, 'WWW-Authenticate: Basic realm=%s\r\n\r\n' % realm) 1473 opener.add_handler(auth_handler) 1474 opener.add_handler(http_handler) 1475 with self.assertWarns(UserWarning): 1476 self._test_basic_auth(opener, auth_handler, "Authorization", 1477 realm, http_handler, password_manager, 1478 "http://acme.example.com/protected", 1479 "http://acme.example.com/protected", 1480 ) 1481 1482 def test_proxy_basic_auth(self): 1483 opener = OpenerDirector() 1484 ph = urllib.request.ProxyHandler(dict(http="proxy.example.com:3128")) 1485 opener.add_handler(ph) 1486 password_manager = MockPasswordManager() 1487 auth_handler = urllib.request.ProxyBasicAuthHandler(password_manager) 1488 realm = "ACME Networks" 1489 http_handler = MockHTTPHandler( 1490 407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm) 1491 opener.add_handler(auth_handler) 1492 opener.add_handler(http_handler) 1493 self._test_basic_auth(opener, auth_handler, "Proxy-authorization", 1494 realm, http_handler, password_manager, 1495 "http://acme.example.com:3128/protected", 1496 "proxy.example.com:3128", 1497 ) 1498 1499 def test_basic_and_digest_auth_handlers(self): 1500 # HTTPDigestAuthHandler raised an exception if it couldn't handle a 40* 1501 # response (http://python.org/sf/1479302), where it should instead 1502 # return None to allow another handler (especially 1503 # HTTPBasicAuthHandler) to handle the response. 1504 1505 # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must 1506 # try digest first (since it's the strongest auth scheme), so we record 1507 # order of calls here to check digest comes first: 1508 class RecordingOpenerDirector(OpenerDirector): 1509 def __init__(self): 1510 OpenerDirector.__init__(self) 1511 self.recorded = [] 1512 1513 def record(self, info): 1514 self.recorded.append(info) 1515 1516 class TestDigestAuthHandler(urllib.request.HTTPDigestAuthHandler): 1517 def http_error_401(self, *args, **kwds): 1518 self.parent.record("digest") 1519 urllib.request.HTTPDigestAuthHandler.http_error_401(self, 1520 *args, **kwds) 1521 1522 class TestBasicAuthHandler(urllib.request.HTTPBasicAuthHandler): 1523 def http_error_401(self, *args, **kwds): 1524 self.parent.record("basic") 1525 urllib.request.HTTPBasicAuthHandler.http_error_401(self, 1526 *args, **kwds) 1527 1528 opener = RecordingOpenerDirector() 1529 password_manager = MockPasswordManager() 1530 digest_handler = TestDigestAuthHandler(password_manager) 1531 basic_handler = TestBasicAuthHandler(password_manager) 1532 realm = "ACME Networks" 1533 http_handler = MockHTTPHandler( 1534 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm) 1535 opener.add_handler(basic_handler) 1536 opener.add_handler(digest_handler) 1537 opener.add_handler(http_handler) 1538 1539 # check basic auth isn't blocked by digest handler failing 1540 self._test_basic_auth(opener, basic_handler, "Authorization", 1541 realm, http_handler, password_manager, 1542 "http://acme.example.com/protected", 1543 "http://acme.example.com/protected", 1544 ) 1545 # check digest was tried before basic (twice, because 1546 # _test_basic_auth called .open() twice) 1547 self.assertEqual(opener.recorded, ["digest", "basic"]*2) 1548 1549 def test_unsupported_auth_digest_handler(self): 1550 opener = OpenerDirector() 1551 # While using DigestAuthHandler 1552 digest_auth_handler = urllib.request.HTTPDigestAuthHandler(None) 1553 http_handler = MockHTTPHandler( 1554 401, 'WWW-Authenticate: Kerberos\r\n\r\n') 1555 opener.add_handler(digest_auth_handler) 1556 opener.add_handler(http_handler) 1557 self.assertRaises(ValueError, opener.open, "http://www.example.com") 1558 1559 def test_unsupported_auth_basic_handler(self): 1560 # While using BasicAuthHandler 1561 opener = OpenerDirector() 1562 basic_auth_handler = urllib.request.HTTPBasicAuthHandler(None) 1563 http_handler = MockHTTPHandler( 1564 401, 'WWW-Authenticate: NTLM\r\n\r\n') 1565 opener.add_handler(basic_auth_handler) 1566 opener.add_handler(http_handler) 1567 self.assertRaises(ValueError, opener.open, "http://www.example.com") 1568 1569 def _test_basic_auth(self, opener, auth_handler, auth_header, 1570 realm, http_handler, password_manager, 1571 request_url, protected_url): 1572 import base64 1573 user, password = "wile", "coyote" 1574 1575 # .add_password() fed through to password manager 1576 auth_handler.add_password(realm, request_url, user, password) 1577 self.assertEqual(realm, password_manager.realm) 1578 self.assertEqual(request_url, password_manager.url) 1579 self.assertEqual(user, password_manager.user) 1580 self.assertEqual(password, password_manager.password) 1581 1582 opener.open(request_url) 1583 1584 # should have asked the password manager for the username/password 1585 self.assertEqual(password_manager.target_realm, realm) 1586 self.assertEqual(password_manager.target_url, protected_url) 1587 1588 # expect one request without authorization, then one with 1589 self.assertEqual(len(http_handler.requests), 2) 1590 self.assertFalse(http_handler.requests[0].has_header(auth_header)) 1591 userpass = bytes('%s:%s' % (user, password), "ascii") 1592 auth_hdr_value = ('Basic ' + 1593 base64.encodebytes(userpass).strip().decode()) 1594 self.assertEqual(http_handler.requests[1].get_header(auth_header), 1595 auth_hdr_value) 1596 self.assertEqual(http_handler.requests[1].unredirected_hdrs[auth_header], 1597 auth_hdr_value) 1598 # if the password manager can't find a password, the handler won't 1599 # handle the HTTP auth error 1600 password_manager.user = password_manager.password = None 1601 http_handler.reset() 1602 opener.open(request_url) 1603 self.assertEqual(len(http_handler.requests), 1) 1604 self.assertFalse(http_handler.requests[0].has_header(auth_header)) 1605 1606 def test_basic_prior_auth_auto_send(self): 1607 # Assume already authenticated if is_authenticated=True 1608 # for APIs like Github that don't return 401 1609 1610 user, password = "wile", "coyote" 1611 request_url = "http://acme.example.com/protected" 1612 1613 http_handler = MockHTTPHandlerCheckAuth(200) 1614 1615 pwd_manager = HTTPPasswordMgrWithPriorAuth() 1616 auth_prior_handler = HTTPBasicAuthHandler(pwd_manager) 1617 auth_prior_handler.add_password( 1618 None, request_url, user, password, is_authenticated=True) 1619 1620 is_auth = pwd_manager.is_authenticated(request_url) 1621 self.assertTrue(is_auth) 1622 1623 opener = OpenerDirector() 1624 opener.add_handler(auth_prior_handler) 1625 opener.add_handler(http_handler) 1626 1627 opener.open(request_url) 1628 1629 # expect request to be sent with auth header 1630 self.assertTrue(http_handler.has_auth_header) 1631 1632 def test_basic_prior_auth_send_after_first_success(self): 1633 # Auto send auth header after authentication is successful once 1634 1635 user, password = 'wile', 'coyote' 1636 request_url = 'http://acme.example.com/protected' 1637 realm = 'ACME' 1638 1639 pwd_manager = HTTPPasswordMgrWithPriorAuth() 1640 auth_prior_handler = HTTPBasicAuthHandler(pwd_manager) 1641 auth_prior_handler.add_password(realm, request_url, user, password) 1642 1643 is_auth = pwd_manager.is_authenticated(request_url) 1644 self.assertFalse(is_auth) 1645 1646 opener = OpenerDirector() 1647 opener.add_handler(auth_prior_handler) 1648 1649 http_handler = MockHTTPHandler( 1650 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % None) 1651 opener.add_handler(http_handler) 1652 1653 opener.open(request_url) 1654 1655 is_auth = pwd_manager.is_authenticated(request_url) 1656 self.assertTrue(is_auth) 1657 1658 http_handler = MockHTTPHandlerCheckAuth(200) 1659 self.assertFalse(http_handler.has_auth_header) 1660 1661 opener = OpenerDirector() 1662 opener.add_handler(auth_prior_handler) 1663 opener.add_handler(http_handler) 1664 1665 # After getting 200 from MockHTTPHandler 1666 # Next request sends header in the first request 1667 opener.open(request_url) 1668 1669 # expect request to be sent with auth header 1670 self.assertTrue(http_handler.has_auth_header) 1671 1672 def test_http_closed(self): 1673 """Test the connection is cleaned up when the response is closed""" 1674 for (transfer, data) in ( 1675 ("Connection: close", b"data"), 1676 ("Transfer-Encoding: chunked", b"4\r\ndata\r\n0\r\n\r\n"), 1677 ("Content-Length: 4", b"data"), 1678 ): 1679 header = "HTTP/1.1 200 OK\r\n{}\r\n\r\n".format(transfer) 1680 conn = test_urllib.fakehttp(header.encode() + data) 1681 handler = urllib.request.AbstractHTTPHandler() 1682 req = Request("http://dummy/") 1683 req.timeout = None 1684 with handler.do_open(conn, req) as resp: 1685 resp.read() 1686 self.assertTrue(conn.fakesock.closed, 1687 "Connection not closed with {!r}".format(transfer)) 1688 1689 def test_invalid_closed(self): 1690 """Test the connection is cleaned up after an invalid response""" 1691 conn = test_urllib.fakehttp(b"") 1692 handler = urllib.request.AbstractHTTPHandler() 1693 req = Request("http://dummy/") 1694 req.timeout = None 1695 with self.assertRaises(http.client.BadStatusLine): 1696 handler.do_open(conn, req) 1697 self.assertTrue(conn.fakesock.closed, "Connection not closed") 1698 1699 1700class MiscTests(unittest.TestCase): 1701 1702 def opener_has_handler(self, opener, handler_class): 1703 self.assertTrue(any(h.__class__ == handler_class 1704 for h in opener.handlers)) 1705 1706 def test_build_opener(self): 1707 class MyHTTPHandler(urllib.request.HTTPHandler): 1708 pass 1709 1710 class FooHandler(urllib.request.BaseHandler): 1711 def foo_open(self): 1712 pass 1713 1714 class BarHandler(urllib.request.BaseHandler): 1715 def bar_open(self): 1716 pass 1717 1718 build_opener = urllib.request.build_opener 1719 1720 o = build_opener(FooHandler, BarHandler) 1721 self.opener_has_handler(o, FooHandler) 1722 self.opener_has_handler(o, BarHandler) 1723 1724 # can take a mix of classes and instances 1725 o = build_opener(FooHandler, BarHandler()) 1726 self.opener_has_handler(o, FooHandler) 1727 self.opener_has_handler(o, BarHandler) 1728 1729 # subclasses of default handlers override default handlers 1730 o = build_opener(MyHTTPHandler) 1731 self.opener_has_handler(o, MyHTTPHandler) 1732 1733 # a particular case of overriding: default handlers can be passed 1734 # in explicitly 1735 o = build_opener() 1736 self.opener_has_handler(o, urllib.request.HTTPHandler) 1737 o = build_opener(urllib.request.HTTPHandler) 1738 self.opener_has_handler(o, urllib.request.HTTPHandler) 1739 o = build_opener(urllib.request.HTTPHandler()) 1740 self.opener_has_handler(o, urllib.request.HTTPHandler) 1741 1742 # Issue2670: multiple handlers sharing the same base class 1743 class MyOtherHTTPHandler(urllib.request.HTTPHandler): 1744 pass 1745 1746 o = build_opener(MyHTTPHandler, MyOtherHTTPHandler) 1747 self.opener_has_handler(o, MyHTTPHandler) 1748 self.opener_has_handler(o, MyOtherHTTPHandler) 1749 1750 @unittest.skipUnless(support.is_resource_enabled('network'), 1751 'test requires network access') 1752 def test_issue16464(self): 1753 with support.transient_internet("http://www.example.com/"): 1754 opener = urllib.request.build_opener() 1755 request = urllib.request.Request("http://www.example.com/") 1756 self.assertEqual(None, request.data) 1757 1758 opener.open(request, "1".encode("us-ascii")) 1759 self.assertEqual(b"1", request.data) 1760 self.assertEqual("1", request.get_header("Content-length")) 1761 1762 opener.open(request, "1234567890".encode("us-ascii")) 1763 self.assertEqual(b"1234567890", request.data) 1764 self.assertEqual("10", request.get_header("Content-length")) 1765 1766 def test_HTTPError_interface(self): 1767 """ 1768 Issue 13211 reveals that HTTPError didn't implement the URLError 1769 interface even though HTTPError is a subclass of URLError. 1770 """ 1771 msg = 'something bad happened' 1772 url = code = fp = None 1773 hdrs = 'Content-Length: 42' 1774 err = urllib.error.HTTPError(url, code, msg, hdrs, fp) 1775 self.assertTrue(hasattr(err, 'reason')) 1776 self.assertEqual(err.reason, 'something bad happened') 1777 self.assertTrue(hasattr(err, 'headers')) 1778 self.assertEqual(err.headers, 'Content-Length: 42') 1779 expected_errmsg = 'HTTP Error %s: %s' % (err.code, err.msg) 1780 self.assertEqual(str(err), expected_errmsg) 1781 expected_errmsg = '<HTTPError %s: %r>' % (err.code, err.msg) 1782 self.assertEqual(repr(err), expected_errmsg) 1783 1784 def test_parse_proxy(self): 1785 parse_proxy_test_cases = [ 1786 ('proxy.example.com', 1787 (None, None, None, 'proxy.example.com')), 1788 ('proxy.example.com:3128', 1789 (None, None, None, 'proxy.example.com:3128')), 1790 ('proxy.example.com', (None, None, None, 'proxy.example.com')), 1791 ('proxy.example.com:3128', 1792 (None, None, None, 'proxy.example.com:3128')), 1793 # The authority component may optionally include userinfo 1794 # (assumed to be # username:password): 1795 ('joe:password@proxy.example.com', 1796 (None, 'joe', 'password', 'proxy.example.com')), 1797 ('joe:password@proxy.example.com:3128', 1798 (None, 'joe', 'password', 'proxy.example.com:3128')), 1799 #Examples with URLS 1800 ('http://proxy.example.com/', 1801 ('http', None, None, 'proxy.example.com')), 1802 ('http://proxy.example.com:3128/', 1803 ('http', None, None, 'proxy.example.com:3128')), 1804 ('http://joe:password@proxy.example.com/', 1805 ('http', 'joe', 'password', 'proxy.example.com')), 1806 ('http://joe:password@proxy.example.com:3128', 1807 ('http', 'joe', 'password', 'proxy.example.com:3128')), 1808 # Everything after the authority is ignored 1809 ('ftp://joe:password@proxy.example.com/rubbish:3128', 1810 ('ftp', 'joe', 'password', 'proxy.example.com')), 1811 # Test for no trailing '/' case 1812 ('http://joe:password@proxy.example.com', 1813 ('http', 'joe', 'password', 'proxy.example.com')) 1814 ] 1815 1816 for tc, expected in parse_proxy_test_cases: 1817 self.assertEqual(_parse_proxy(tc), expected) 1818 1819 self.assertRaises(ValueError, _parse_proxy, 'file:/ftp.example.com'), 1820 1821 def test_unsupported_algorithm(self): 1822 handler = AbstractDigestAuthHandler() 1823 with self.assertRaises(ValueError) as exc: 1824 handler.get_algorithm_impls('invalid') 1825 self.assertEqual( 1826 str(exc.exception), 1827 "Unsupported digest authentication algorithm 'invalid'" 1828 ) 1829 1830 1831class RequestTests(unittest.TestCase): 1832 class PutRequest(Request): 1833 method = 'PUT' 1834 1835 def setUp(self): 1836 self.get = Request("http://www.python.org/~jeremy/") 1837 self.post = Request("http://www.python.org/~jeremy/", 1838 "data", 1839 headers={"X-Test": "test"}) 1840 self.head = Request("http://www.python.org/~jeremy/", method='HEAD') 1841 self.put = self.PutRequest("http://www.python.org/~jeremy/") 1842 self.force_post = self.PutRequest("http://www.python.org/~jeremy/", 1843 method="POST") 1844 1845 def test_method(self): 1846 self.assertEqual("POST", self.post.get_method()) 1847 self.assertEqual("GET", self.get.get_method()) 1848 self.assertEqual("HEAD", self.head.get_method()) 1849 self.assertEqual("PUT", self.put.get_method()) 1850 self.assertEqual("POST", self.force_post.get_method()) 1851 1852 def test_data(self): 1853 self.assertFalse(self.get.data) 1854 self.assertEqual("GET", self.get.get_method()) 1855 self.get.data = "spam" 1856 self.assertTrue(self.get.data) 1857 self.assertEqual("POST", self.get.get_method()) 1858 1859 # issue 16464 1860 # if we change data we need to remove content-length header 1861 # (cause it's most probably calculated for previous value) 1862 def test_setting_data_should_remove_content_length(self): 1863 self.assertNotIn("Content-length", self.get.unredirected_hdrs) 1864 self.get.add_unredirected_header("Content-length", 42) 1865 self.assertEqual(42, self.get.unredirected_hdrs["Content-length"]) 1866 self.get.data = "spam" 1867 self.assertNotIn("Content-length", self.get.unredirected_hdrs) 1868 1869 # issue 17485 same for deleting data. 1870 def test_deleting_data_should_remove_content_length(self): 1871 self.assertNotIn("Content-length", self.get.unredirected_hdrs) 1872 self.get.data = 'foo' 1873 self.get.add_unredirected_header("Content-length", 3) 1874 self.assertEqual(3, self.get.unredirected_hdrs["Content-length"]) 1875 del self.get.data 1876 self.assertNotIn("Content-length", self.get.unredirected_hdrs) 1877 1878 def test_get_full_url(self): 1879 self.assertEqual("http://www.python.org/~jeremy/", 1880 self.get.get_full_url()) 1881 1882 def test_selector(self): 1883 self.assertEqual("/~jeremy/", self.get.selector) 1884 req = Request("http://www.python.org/") 1885 self.assertEqual("/", req.selector) 1886 1887 def test_get_type(self): 1888 self.assertEqual("http", self.get.type) 1889 1890 def test_get_host(self): 1891 self.assertEqual("www.python.org", self.get.host) 1892 1893 def test_get_host_unquote(self): 1894 req = Request("http://www.%70ython.org/") 1895 self.assertEqual("www.python.org", req.host) 1896 1897 def test_proxy(self): 1898 self.assertFalse(self.get.has_proxy()) 1899 self.get.set_proxy("www.perl.org", "http") 1900 self.assertTrue(self.get.has_proxy()) 1901 self.assertEqual("www.python.org", self.get.origin_req_host) 1902 self.assertEqual("www.perl.org", self.get.host) 1903 1904 def test_wrapped_url(self): 1905 req = Request("<URL:http://www.python.org>") 1906 self.assertEqual("www.python.org", req.host) 1907 1908 def test_url_fragment(self): 1909 req = Request("http://www.python.org/?qs=query#fragment=true") 1910 self.assertEqual("/?qs=query", req.selector) 1911 req = Request("http://www.python.org/#fun=true") 1912 self.assertEqual("/", req.selector) 1913 1914 # Issue 11703: geturl() omits fragment in the original URL. 1915 url = 'http://docs.python.org/library/urllib2.html#OK' 1916 req = Request(url) 1917 self.assertEqual(req.get_full_url(), url) 1918 1919 def test_url_fullurl_get_full_url(self): 1920 urls = ['http://docs.python.org', 1921 'http://docs.python.org/library/urllib2.html#OK', 1922 'http://www.python.org/?qs=query#fragment=true'] 1923 for url in urls: 1924 req = Request(url) 1925 self.assertEqual(req.get_full_url(), req.full_url) 1926 1927 1928if __name__ == "__main__": 1929 unittest.main() 1930