1import unittest 2from test import support 3from test.support import os_helper 4from test.support import socket_helper 5from test.support import warnings_helper 6from test import test_urllib 7 8import os 9import io 10import socket 11import array 12import sys 13import tempfile 14import subprocess 15 16import urllib.request 17# The proxy bypass method imported below has logic specific to the OSX 18# proxy config data structure but is testable on all platforms. 19from urllib.request import (Request, OpenerDirector, HTTPBasicAuthHandler, 20 HTTPPasswordMgrWithPriorAuth, _parse_proxy, 21 _proxy_bypass_macosx_sysconf, 22 AbstractDigestAuthHandler) 23from urllib.parse import urlparse 24import urllib.error 25import http.client 26 27# XXX 28# Request 29# CacheFTPHandler (hard to write) 30# parse_keqv_list, parse_http_list, HTTPDigestAuthHandler 31 32 33class TrivialTests(unittest.TestCase): 34 35 def test___all__(self): 36 # Verify which names are exposed 37 for module in 'request', 'response', 'parse', 'error', 'robotparser': 38 context = {} 39 exec('from urllib.%s import *' % module, context) 40 del context['__builtins__'] 41 if module == 'request' and os.name == 'nt': 42 u, p = context.pop('url2pathname'), context.pop('pathname2url') 43 self.assertEqual(u.__module__, 'nturl2path') 44 self.assertEqual(p.__module__, 'nturl2path') 45 for k, v in context.items(): 46 self.assertEqual(v.__module__, 'urllib.%s' % module, 47 "%r is exposed in 'urllib.%s' but defined in %r" % 48 (k, module, v.__module__)) 49 50 def test_trivial(self): 51 # A couple trivial tests 52 53 # clear _opener global variable 54 self.addCleanup(urllib.request.urlcleanup) 55 56 self.assertRaises(ValueError, urllib.request.urlopen, 'bogus url') 57 58 # XXX Name hacking to get this to work on Windows. 59 fname = os.path.abspath(urllib.request.__file__).replace(os.sep, '/') 60 61 if os.name == 'nt': 62 file_url = "file:///%s" % fname 63 else: 64 file_url = "file://%s" % fname 65 66 with urllib.request.urlopen(file_url) as f: 67 f.read() 68 69 def test_parse_http_list(self): 70 tests = [ 71 ('a,b,c', ['a', 'b', 'c']), 72 ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']), 73 ('a, b, "c", "d", "e,f", g, h', 74 ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']), 75 ('a="b\\"c", d="e\\,f", g="h\\\\i"', 76 ['a="b"c"', 'd="e,f"', 'g="h\\i"'])] 77 for string, list in tests: 78 self.assertEqual(urllib.request.parse_http_list(string), list) 79 80 def test_URLError_reasonstr(self): 81 err = urllib.error.URLError('reason') 82 self.assertIn(err.reason, str(err)) 83 84 85class RequestHdrsTests(unittest.TestCase): 86 87 def test_request_headers_dict(self): 88 """ 89 The Request.headers dictionary is not a documented interface. It 90 should stay that way, because the complete set of headers are only 91 accessible through the .get_header(), .has_header(), .header_items() 92 interface. However, .headers pre-dates those methods, and so real code 93 will be using the dictionary. 94 95 The introduction in 2.4 of those methods was a mistake for the same 96 reason: code that previously saw all (urllib2 user)-provided headers in 97 .headers now sees only a subset. 98 99 """ 100 url = "http://example.com" 101 self.assertEqual(Request(url, 102 headers={"Spam-eggs": "blah"} 103 ).headers["Spam-eggs"], "blah") 104 self.assertEqual(Request(url, 105 headers={"spam-EggS": "blah"} 106 ).headers["Spam-eggs"], "blah") 107 108 def test_request_headers_methods(self): 109 """ 110 Note the case normalization of header names here, to 111 .capitalize()-case. This should be preserved for 112 backwards-compatibility. (In the HTTP case, normalization to 113 .title()-case is done by urllib2 before sending headers to 114 http.client). 115 116 Note that e.g. r.has_header("spam-EggS") is currently False, and 117 r.get_header("spam-EggS") returns None, but that could be changed in 118 future. 119 120 Method r.remove_header should remove items both from r.headers and 121 r.unredirected_hdrs dictionaries 122 """ 123 url = "http://example.com" 124 req = Request(url, headers={"Spam-eggs": "blah"}) 125 self.assertTrue(req.has_header("Spam-eggs")) 126 self.assertEqual(req.header_items(), [('Spam-eggs', 'blah')]) 127 128 req.add_header("Foo-Bar", "baz") 129 self.assertEqual(sorted(req.header_items()), 130 [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')]) 131 self.assertFalse(req.has_header("Not-there")) 132 self.assertIsNone(req.get_header("Not-there")) 133 self.assertEqual(req.get_header("Not-there", "default"), "default") 134 135 req.remove_header("Spam-eggs") 136 self.assertFalse(req.has_header("Spam-eggs")) 137 138 req.add_unredirected_header("Unredirected-spam", "Eggs") 139 self.assertTrue(req.has_header("Unredirected-spam")) 140 141 req.remove_header("Unredirected-spam") 142 self.assertFalse(req.has_header("Unredirected-spam")) 143 144 def test_password_manager(self): 145 mgr = urllib.request.HTTPPasswordMgr() 146 add = mgr.add_password 147 find_user_pass = mgr.find_user_password 148 149 add("Some Realm", "http://example.com/", "joe", "password") 150 add("Some Realm", "http://example.com/ni", "ni", "ni") 151 add("Some Realm", "http://c.example.com:3128", "3", "c") 152 add("Some Realm", "d.example.com", "4", "d") 153 add("Some Realm", "e.example.com:3128", "5", "e") 154 155 # For the same realm, password set the highest path is the winner. 156 self.assertEqual(find_user_pass("Some Realm", "example.com"), 157 ('joe', 'password')) 158 self.assertEqual(find_user_pass("Some Realm", "http://example.com/ni"), 159 ('joe', 'password')) 160 self.assertEqual(find_user_pass("Some Realm", "http://example.com"), 161 ('joe', 'password')) 162 self.assertEqual(find_user_pass("Some Realm", "http://example.com/"), 163 ('joe', 'password')) 164 self.assertEqual(find_user_pass("Some Realm", 165 "http://example.com/spam"), 166 ('joe', 'password')) 167 168 self.assertEqual(find_user_pass("Some Realm", 169 "http://example.com/spam/spam"), 170 ('joe', 'password')) 171 172 # You can have different passwords for different paths. 173 174 add("c", "http://example.com/foo", "foo", "ni") 175 add("c", "http://example.com/bar", "bar", "nini") 176 177 self.assertEqual(find_user_pass("c", "http://example.com/foo"), 178 ('foo', 'ni')) 179 180 self.assertEqual(find_user_pass("c", "http://example.com/bar"), 181 ('bar', 'nini')) 182 183 # For the same path, newer password should be considered. 184 185 add("b", "http://example.com/", "first", "blah") 186 add("b", "http://example.com/", "second", "spam") 187 188 self.assertEqual(find_user_pass("b", "http://example.com/"), 189 ('second', 'spam')) 190 191 # No special relationship between a.example.com and example.com: 192 193 add("a", "http://example.com", "1", "a") 194 self.assertEqual(find_user_pass("a", "http://example.com/"), 195 ('1', 'a')) 196 197 self.assertEqual(find_user_pass("a", "http://a.example.com/"), 198 (None, None)) 199 200 # Ports: 201 202 self.assertEqual(find_user_pass("Some Realm", "c.example.com"), 203 (None, None)) 204 self.assertEqual(find_user_pass("Some Realm", "c.example.com:3128"), 205 ('3', 'c')) 206 self.assertEqual( 207 find_user_pass("Some Realm", "http://c.example.com:3128"), 208 ('3', 'c')) 209 self.assertEqual(find_user_pass("Some Realm", "d.example.com"), 210 ('4', 'd')) 211 self.assertEqual(find_user_pass("Some Realm", "e.example.com:3128"), 212 ('5', 'e')) 213 214 def test_password_manager_default_port(self): 215 """ 216 The point to note here is that we can't guess the default port if 217 there's no scheme. This applies to both add_password and 218 find_user_password. 219 """ 220 mgr = urllib.request.HTTPPasswordMgr() 221 add = mgr.add_password 222 find_user_pass = mgr.find_user_password 223 add("f", "http://g.example.com:80", "10", "j") 224 add("g", "http://h.example.com", "11", "k") 225 add("h", "i.example.com:80", "12", "l") 226 add("i", "j.example.com", "13", "m") 227 self.assertEqual(find_user_pass("f", "g.example.com:100"), 228 (None, None)) 229 self.assertEqual(find_user_pass("f", "g.example.com:80"), 230 ('10', 'j')) 231 self.assertEqual(find_user_pass("f", "g.example.com"), 232 (None, None)) 233 self.assertEqual(find_user_pass("f", "http://g.example.com:100"), 234 (None, None)) 235 self.assertEqual(find_user_pass("f", "http://g.example.com:80"), 236 ('10', 'j')) 237 self.assertEqual(find_user_pass("f", "http://g.example.com"), 238 ('10', 'j')) 239 self.assertEqual(find_user_pass("g", "h.example.com"), ('11', 'k')) 240 self.assertEqual(find_user_pass("g", "h.example.com:80"), ('11', 'k')) 241 self.assertEqual(find_user_pass("g", "http://h.example.com:80"), 242 ('11', 'k')) 243 self.assertEqual(find_user_pass("h", "i.example.com"), (None, None)) 244 self.assertEqual(find_user_pass("h", "i.example.com:80"), ('12', 'l')) 245 self.assertEqual(find_user_pass("h", "http://i.example.com:80"), 246 ('12', 'l')) 247 self.assertEqual(find_user_pass("i", "j.example.com"), ('13', 'm')) 248 self.assertEqual(find_user_pass("i", "j.example.com:80"), 249 (None, None)) 250 self.assertEqual(find_user_pass("i", "http://j.example.com"), 251 ('13', 'm')) 252 self.assertEqual(find_user_pass("i", "http://j.example.com:80"), 253 (None, None)) 254 255 256class MockOpener: 257 addheaders = [] 258 259 def open(self, req, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): 260 self.req, self.data, self.timeout = req, data, timeout 261 262 def error(self, proto, *args): 263 self.proto, self.args = proto, args 264 265 266class MockFile: 267 def read(self, count=None): 268 pass 269 270 def readline(self, count=None): 271 pass 272 273 def close(self): 274 pass 275 276 277class MockHeaders(dict): 278 def getheaders(self, name): 279 return list(self.values()) 280 281 282class MockResponse(io.StringIO): 283 def __init__(self, code, msg, headers, data, url=None): 284 io.StringIO.__init__(self, data) 285 self.code, self.msg, self.headers, self.url = code, msg, headers, url 286 287 def info(self): 288 return self.headers 289 290 def geturl(self): 291 return self.url 292 293 294class MockCookieJar: 295 def add_cookie_header(self, request): 296 self.ach_req = request 297 298 def extract_cookies(self, response, request): 299 self.ec_req, self.ec_r = request, response 300 301 302class FakeMethod: 303 def __init__(self, meth_name, action, handle): 304 self.meth_name = meth_name 305 self.handle = handle 306 self.action = action 307 308 def __call__(self, *args): 309 return self.handle(self.meth_name, self.action, *args) 310 311 312class MockHTTPResponse(io.IOBase): 313 def __init__(self, fp, msg, status, reason): 314 self.fp = fp 315 self.msg = msg 316 self.status = status 317 self.reason = reason 318 self.code = 200 319 320 def read(self): 321 return '' 322 323 def info(self): 324 return {} 325 326 def geturl(self): 327 return self.url 328 329 330class MockHTTPClass: 331 def __init__(self): 332 self.level = 0 333 self.req_headers = [] 334 self.data = None 335 self.raise_on_endheaders = False 336 self.sock = None 337 self._tunnel_headers = {} 338 339 def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): 340 self.host = host 341 self.timeout = timeout 342 return self 343 344 def set_debuglevel(self, level): 345 self.level = level 346 347 def set_tunnel(self, host, port=None, headers=None): 348 self._tunnel_host = host 349 self._tunnel_port = port 350 if headers: 351 self._tunnel_headers = headers 352 else: 353 self._tunnel_headers.clear() 354 355 def request(self, method, url, body=None, headers=None, *, 356 encode_chunked=False): 357 self.method = method 358 self.selector = url 359 if headers is not None: 360 self.req_headers += headers.items() 361 self.req_headers.sort() 362 if body: 363 self.data = body 364 self.encode_chunked = encode_chunked 365 if self.raise_on_endheaders: 366 raise OSError() 367 368 def getresponse(self): 369 return MockHTTPResponse(MockFile(), {}, 200, "OK") 370 371 def close(self): 372 pass 373 374 375class MockHandler: 376 # useful for testing handler machinery 377 # see add_ordered_mock_handlers() docstring 378 handler_order = 500 379 380 def __init__(self, methods): 381 self._define_methods(methods) 382 383 def _define_methods(self, methods): 384 for spec in methods: 385 if len(spec) == 2: 386 name, action = spec 387 else: 388 name, action = spec, None 389 meth = FakeMethod(name, action, self.handle) 390 setattr(self.__class__, name, meth) 391 392 def handle(self, fn_name, action, *args, **kwds): 393 self.parent.calls.append((self, fn_name, args, kwds)) 394 if action is None: 395 return None 396 elif action == "return self": 397 return self 398 elif action == "return response": 399 res = MockResponse(200, "OK", {}, "") 400 return res 401 elif action == "return request": 402 return Request("http://blah/") 403 elif action.startswith("error"): 404 code = action[action.rfind(" ")+1:] 405 try: 406 code = int(code) 407 except ValueError: 408 pass 409 res = MockResponse(200, "OK", {}, "") 410 return self.parent.error("http", args[0], res, code, "", {}) 411 elif action == "raise": 412 raise urllib.error.URLError("blah") 413 assert False 414 415 def close(self): 416 pass 417 418 def add_parent(self, parent): 419 self.parent = parent 420 self.parent.calls = [] 421 422 def __lt__(self, other): 423 if not hasattr(other, "handler_order"): 424 # No handler_order, leave in original order. Yuck. 425 return True 426 return self.handler_order < other.handler_order 427 428 429def add_ordered_mock_handlers(opener, meth_spec): 430 """Create MockHandlers and add them to an OpenerDirector. 431 432 meth_spec: list of lists of tuples and strings defining methods to define 433 on handlers. eg: 434 435 [["http_error", "ftp_open"], ["http_open"]] 436 437 defines methods .http_error() and .ftp_open() on one handler, and 438 .http_open() on another. These methods just record their arguments and 439 return None. Using a tuple instead of a string causes the method to 440 perform some action (see MockHandler.handle()), eg: 441 442 [["http_error"], [("http_open", "return request")]] 443 444 defines .http_error() on one handler (which simply returns None), and 445 .http_open() on another handler, which returns a Request object. 446 447 """ 448 handlers = [] 449 count = 0 450 for meths in meth_spec: 451 class MockHandlerSubclass(MockHandler): 452 pass 453 454 h = MockHandlerSubclass(meths) 455 h.handler_order += count 456 h.add_parent(opener) 457 count = count + 1 458 handlers.append(h) 459 opener.add_handler(h) 460 return handlers 461 462 463def build_test_opener(*handler_instances): 464 opener = OpenerDirector() 465 for h in handler_instances: 466 opener.add_handler(h) 467 return opener 468 469 470class MockHTTPHandler(urllib.request.BaseHandler): 471 # useful for testing redirections and auth 472 # sends supplied headers and code as first response 473 # sends 200 OK as second response 474 def __init__(self, code, headers): 475 self.code = code 476 self.headers = headers 477 self.reset() 478 479 def reset(self): 480 self._count = 0 481 self.requests = [] 482 483 def http_open(self, req): 484 import email, copy 485 self.requests.append(copy.deepcopy(req)) 486 if self._count == 0: 487 self._count = self._count + 1 488 name = http.client.responses[self.code] 489 msg = email.message_from_string(self.headers) 490 return self.parent.error( 491 "http", req, MockFile(), self.code, name, msg) 492 else: 493 self.req = req 494 msg = email.message_from_string("\r\n\r\n") 495 return MockResponse(200, "OK", msg, "", req.get_full_url()) 496 497 498class MockHTTPSHandler(urllib.request.AbstractHTTPHandler): 499 # Useful for testing the Proxy-Authorization request by verifying the 500 # properties of httpcon 501 502 def __init__(self, debuglevel=0): 503 urllib.request.AbstractHTTPHandler.__init__(self, debuglevel=debuglevel) 504 self.httpconn = MockHTTPClass() 505 506 def https_open(self, req): 507 return self.do_open(self.httpconn, req) 508 509 510class MockHTTPHandlerCheckAuth(urllib.request.BaseHandler): 511 # useful for testing auth 512 # sends supplied code response 513 # checks if auth header is specified in request 514 def __init__(self, code): 515 self.code = code 516 self.has_auth_header = False 517 518 def reset(self): 519 self.has_auth_header = False 520 521 def http_open(self, req): 522 if req.has_header('Authorization'): 523 self.has_auth_header = True 524 name = http.client.responses[self.code] 525 return MockResponse(self.code, name, MockFile(), "", req.get_full_url()) 526 527 528 529class MockPasswordManager: 530 def add_password(self, realm, uri, user, password): 531 self.realm = realm 532 self.url = uri 533 self.user = user 534 self.password = password 535 536 def find_user_password(self, realm, authuri): 537 self.target_realm = realm 538 self.target_url = authuri 539 return self.user, self.password 540 541 542class OpenerDirectorTests(unittest.TestCase): 543 544 def test_add_non_handler(self): 545 class NonHandler(object): 546 pass 547 self.assertRaises(TypeError, 548 OpenerDirector().add_handler, NonHandler()) 549 550 def test_badly_named_methods(self): 551 # test work-around for three methods that accidentally follow the 552 # naming conventions for handler methods 553 # (*_open() / *_request() / *_response()) 554 555 # These used to call the accidentally-named methods, causing a 556 # TypeError in real code; here, returning self from these mock 557 # methods would either cause no exception, or AttributeError. 558 559 from urllib.error import URLError 560 561 o = OpenerDirector() 562 meth_spec = [ 563 [("do_open", "return self"), ("proxy_open", "return self")], 564 [("redirect_request", "return self")], 565 ] 566 add_ordered_mock_handlers(o, meth_spec) 567 o.add_handler(urllib.request.UnknownHandler()) 568 for scheme in "do", "proxy", "redirect": 569 self.assertRaises(URLError, o.open, scheme+"://example.com/") 570 571 def test_handled(self): 572 # handler returning non-None means no more handlers will be called 573 o = OpenerDirector() 574 meth_spec = [ 575 ["http_open", "ftp_open", "http_error_302"], 576 ["ftp_open"], 577 [("http_open", "return self")], 578 [("http_open", "return self")], 579 ] 580 handlers = add_ordered_mock_handlers(o, meth_spec) 581 582 req = Request("http://example.com/") 583 r = o.open(req) 584 # Second .http_open() gets called, third doesn't, since second returned 585 # non-None. Handlers without .http_open() never get any methods called 586 # on them. 587 # In fact, second mock handler defining .http_open() returns self 588 # (instead of response), which becomes the OpenerDirector's return 589 # value. 590 self.assertEqual(r, handlers[2]) 591 calls = [(handlers[0], "http_open"), (handlers[2], "http_open")] 592 for expected, got in zip(calls, o.calls): 593 handler, name, args, kwds = got 594 self.assertEqual((handler, name), expected) 595 self.assertEqual(args, (req,)) 596 597 def test_handler_order(self): 598 o = OpenerDirector() 599 handlers = [] 600 for meths, handler_order in [([("http_open", "return self")], 500), 601 (["http_open"], 0)]: 602 class MockHandlerSubclass(MockHandler): 603 pass 604 605 h = MockHandlerSubclass(meths) 606 h.handler_order = handler_order 607 handlers.append(h) 608 o.add_handler(h) 609 610 o.open("http://example.com/") 611 # handlers called in reverse order, thanks to their sort order 612 self.assertEqual(o.calls[0][0], handlers[1]) 613 self.assertEqual(o.calls[1][0], handlers[0]) 614 615 def test_raise(self): 616 # raising URLError stops processing of request 617 o = OpenerDirector() 618 meth_spec = [ 619 [("http_open", "raise")], 620 [("http_open", "return self")], 621 ] 622 handlers = add_ordered_mock_handlers(o, meth_spec) 623 624 req = Request("http://example.com/") 625 self.assertRaises(urllib.error.URLError, o.open, req) 626 self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})]) 627 628 def test_http_error(self): 629 # XXX http_error_default 630 # http errors are a special case 631 o = OpenerDirector() 632 meth_spec = [ 633 [("http_open", "error 302")], 634 [("http_error_400", "raise"), "http_open"], 635 [("http_error_302", "return response"), "http_error_303", 636 "http_error"], 637 [("http_error_302")], 638 ] 639 handlers = add_ordered_mock_handlers(o, meth_spec) 640 req = Request("http://example.com/") 641 o.open(req) 642 assert len(o.calls) == 2 643 calls = [(handlers[0], "http_open", (req,)), 644 (handlers[2], "http_error_302", 645 (req, support.ALWAYS_EQ, 302, "", {}))] 646 for expected, got in zip(calls, o.calls): 647 handler, method_name, args = expected 648 self.assertEqual((handler, method_name), got[:2]) 649 self.assertEqual(args, got[2]) 650 651 def test_processors(self): 652 # *_request / *_response methods get called appropriately 653 o = OpenerDirector() 654 meth_spec = [ 655 [("http_request", "return request"), 656 ("http_response", "return response")], 657 [("http_request", "return request"), 658 ("http_response", "return response")], 659 ] 660 handlers = add_ordered_mock_handlers(o, meth_spec) 661 662 req = Request("http://example.com/") 663 o.open(req) 664 # processor methods are called on *all* handlers that define them, 665 # not just the first handler that handles the request 666 calls = [ 667 (handlers[0], "http_request"), (handlers[1], "http_request"), 668 (handlers[0], "http_response"), (handlers[1], "http_response")] 669 670 for i, (handler, name, args, kwds) in enumerate(o.calls): 671 if i < 2: 672 # *_request 673 self.assertEqual((handler, name), calls[i]) 674 self.assertEqual(len(args), 1) 675 self.assertIsInstance(args[0], Request) 676 else: 677 # *_response 678 self.assertEqual((handler, name), calls[i]) 679 self.assertEqual(len(args), 2) 680 self.assertIsInstance(args[0], Request) 681 # response from opener.open is None, because there's no 682 # handler that defines http_open to handle it 683 if args[1] is not None: 684 self.assertIsInstance(args[1], MockResponse) 685 686 687def sanepathname2url(path): 688 try: 689 path.encode("utf-8") 690 except UnicodeEncodeError: 691 raise unittest.SkipTest("path is not encodable to utf8") 692 urlpath = urllib.request.pathname2url(path) 693 if os.name == "nt" and urlpath.startswith("///"): 694 urlpath = urlpath[2:] 695 # XXX don't ask me about the mac... 696 return urlpath 697 698 699class HandlerTests(unittest.TestCase): 700 701 def test_ftp(self): 702 class MockFTPWrapper: 703 def __init__(self, data): 704 self.data = data 705 706 def retrfile(self, filename, filetype): 707 self.filename, self.filetype = filename, filetype 708 return io.StringIO(self.data), len(self.data) 709 710 def close(self): 711 pass 712 713 class NullFTPHandler(urllib.request.FTPHandler): 714 def __init__(self, data): 715 self.data = data 716 717 def connect_ftp(self, user, passwd, host, port, dirs, 718 timeout=socket._GLOBAL_DEFAULT_TIMEOUT): 719 self.user, self.passwd = user, passwd 720 self.host, self.port = host, port 721 self.dirs = dirs 722 self.ftpwrapper = MockFTPWrapper(self.data) 723 return self.ftpwrapper 724 725 import ftplib 726 data = "rheum rhaponicum" 727 h = NullFTPHandler(data) 728 h.parent = MockOpener() 729 730 for url, host, port, user, passwd, type_, dirs, filename, mimetype in [ 731 ("ftp://localhost/foo/bar/baz.html", 732 "localhost", ftplib.FTP_PORT, "", "", "I", 733 ["foo", "bar"], "baz.html", "text/html"), 734 ("ftp://parrot@localhost/foo/bar/baz.html", 735 "localhost", ftplib.FTP_PORT, "parrot", "", "I", 736 ["foo", "bar"], "baz.html", "text/html"), 737 ("ftp://%25parrot@localhost/foo/bar/baz.html", 738 "localhost", ftplib.FTP_PORT, "%parrot", "", "I", 739 ["foo", "bar"], "baz.html", "text/html"), 740 ("ftp://%2542parrot@localhost/foo/bar/baz.html", 741 "localhost", ftplib.FTP_PORT, "%42parrot", "", "I", 742 ["foo", "bar"], "baz.html", "text/html"), 743 ("ftp://localhost:80/foo/bar/", 744 "localhost", 80, "", "", "D", 745 ["foo", "bar"], "", None), 746 ("ftp://localhost/baz.gif;type=a", 747 "localhost", ftplib.FTP_PORT, "", "", "A", 748 [], "baz.gif", None), # XXX really this should guess image/gif 749 ]: 750 req = Request(url) 751 req.timeout = None 752 r = h.ftp_open(req) 753 # ftp authentication not yet implemented by FTPHandler 754 self.assertEqual(h.user, user) 755 self.assertEqual(h.passwd, passwd) 756 self.assertEqual(h.host, socket.gethostbyname(host)) 757 self.assertEqual(h.port, port) 758 self.assertEqual(h.dirs, dirs) 759 self.assertEqual(h.ftpwrapper.filename, filename) 760 self.assertEqual(h.ftpwrapper.filetype, type_) 761 headers = r.info() 762 self.assertEqual(headers.get("Content-type"), mimetype) 763 self.assertEqual(int(headers["Content-length"]), len(data)) 764 765 def test_file(self): 766 import email.utils 767 h = urllib.request.FileHandler() 768 o = h.parent = MockOpener() 769 770 TESTFN = os_helper.TESTFN 771 urlpath = sanepathname2url(os.path.abspath(TESTFN)) 772 towrite = b"hello, world\n" 773 urls = [ 774 "file://localhost%s" % urlpath, 775 "file://%s" % urlpath, 776 "file://%s%s" % (socket.gethostbyname('localhost'), urlpath), 777 ] 778 try: 779 localaddr = socket.gethostbyname(socket.gethostname()) 780 except socket.gaierror: 781 localaddr = '' 782 if localaddr: 783 urls.append("file://%s%s" % (localaddr, urlpath)) 784 785 for url in urls: 786 f = open(TESTFN, "wb") 787 try: 788 try: 789 f.write(towrite) 790 finally: 791 f.close() 792 793 r = h.file_open(Request(url)) 794 try: 795 data = r.read() 796 headers = r.info() 797 respurl = r.geturl() 798 finally: 799 r.close() 800 stats = os.stat(TESTFN) 801 modified = email.utils.formatdate(stats.st_mtime, usegmt=True) 802 finally: 803 os.remove(TESTFN) 804 self.assertEqual(data, towrite) 805 self.assertEqual(headers["Content-type"], "text/plain") 806 self.assertEqual(headers["Content-length"], "13") 807 self.assertEqual(headers["Last-modified"], modified) 808 self.assertEqual(respurl, url) 809 810 for url in [ 811 "file://localhost:80%s" % urlpath, 812 "file:///file_does_not_exist.txt", 813 "file://not-a-local-host.com//dir/file.txt", 814 "file://%s:80%s/%s" % (socket.gethostbyname('localhost'), 815 os.getcwd(), TESTFN), 816 "file://somerandomhost.ontheinternet.com%s/%s" % 817 (os.getcwd(), TESTFN), 818 ]: 819 try: 820 f = open(TESTFN, "wb") 821 try: 822 f.write(towrite) 823 finally: 824 f.close() 825 826 self.assertRaises(urllib.error.URLError, 827 h.file_open, Request(url)) 828 finally: 829 os.remove(TESTFN) 830 831 h = urllib.request.FileHandler() 832 o = h.parent = MockOpener() 833 # XXXX why does // mean ftp (and /// mean not ftp!), and where 834 # is file: scheme specified? I think this is really a bug, and 835 # what was intended was to distinguish between URLs like: 836 # file:/blah.txt (a file) 837 # file://localhost/blah.txt (a file) 838 # file:///blah.txt (a file) 839 # file://ftp.example.com/blah.txt (an ftp URL) 840 for url, ftp in [ 841 ("file://ftp.example.com//foo.txt", False), 842 ("file://ftp.example.com///foo.txt", False), 843 ("file://ftp.example.com/foo.txt", False), 844 ("file://somehost//foo/something.txt", False), 845 ("file://localhost//foo/something.txt", False), 846 ]: 847 req = Request(url) 848 try: 849 h.file_open(req) 850 except urllib.error.URLError: 851 self.assertFalse(ftp) 852 else: 853 self.assertIs(o.req, req) 854 self.assertEqual(req.type, "ftp") 855 self.assertEqual(req.type == "ftp", ftp) 856 857 def test_http(self): 858 859 h = urllib.request.AbstractHTTPHandler() 860 o = h.parent = MockOpener() 861 862 url = "http://example.com/" 863 for method, data in [("GET", None), ("POST", b"blah")]: 864 req = Request(url, data, {"Foo": "bar"}) 865 req.timeout = None 866 req.add_unredirected_header("Spam", "eggs") 867 http = MockHTTPClass() 868 r = h.do_open(http, req) 869 870 # result attributes 871 r.read; r.readline # wrapped MockFile methods 872 r.info; r.geturl # addinfourl methods 873 r.code, r.msg == 200, "OK" # added from MockHTTPClass.getreply() 874 hdrs = r.info() 875 hdrs.get; hdrs.__contains__ # r.info() gives dict from .getreply() 876 self.assertEqual(r.geturl(), url) 877 878 self.assertEqual(http.host, "example.com") 879 self.assertEqual(http.level, 0) 880 self.assertEqual(http.method, method) 881 self.assertEqual(http.selector, "/") 882 self.assertEqual(http.req_headers, 883 [("Connection", "close"), 884 ("Foo", "bar"), ("Spam", "eggs")]) 885 self.assertEqual(http.data, data) 886 887 # check OSError converted to URLError 888 http.raise_on_endheaders = True 889 self.assertRaises(urllib.error.URLError, h.do_open, http, req) 890 891 # Check for TypeError on POST data which is str. 892 req = Request("http://example.com/","badpost") 893 self.assertRaises(TypeError, h.do_request_, req) 894 895 # check adding of standard headers 896 o.addheaders = [("Spam", "eggs")] 897 for data in b"", None: # POST, GET 898 req = Request("http://example.com/", data) 899 r = MockResponse(200, "OK", {}, "") 900 newreq = h.do_request_(req) 901 if data is None: # GET 902 self.assertNotIn("Content-length", req.unredirected_hdrs) 903 self.assertNotIn("Content-type", req.unredirected_hdrs) 904 else: # POST 905 self.assertEqual(req.unredirected_hdrs["Content-length"], "0") 906 self.assertEqual(req.unredirected_hdrs["Content-type"], 907 "application/x-www-form-urlencoded") 908 # XXX the details of Host could be better tested 909 self.assertEqual(req.unredirected_hdrs["Host"], "example.com") 910 self.assertEqual(req.unredirected_hdrs["Spam"], "eggs") 911 912 # don't clobber existing headers 913 req.add_unredirected_header("Content-length", "foo") 914 req.add_unredirected_header("Content-type", "bar") 915 req.add_unredirected_header("Host", "baz") 916 req.add_unredirected_header("Spam", "foo") 917 newreq = h.do_request_(req) 918 self.assertEqual(req.unredirected_hdrs["Content-length"], "foo") 919 self.assertEqual(req.unredirected_hdrs["Content-type"], "bar") 920 self.assertEqual(req.unredirected_hdrs["Host"], "baz") 921 self.assertEqual(req.unredirected_hdrs["Spam"], "foo") 922 923 def test_http_body_file(self): 924 # A regular file - chunked encoding is used unless Content Length is 925 # already set. 926 927 h = urllib.request.AbstractHTTPHandler() 928 o = h.parent = MockOpener() 929 930 file_obj = tempfile.NamedTemporaryFile(mode='w+b', delete=False) 931 file_path = file_obj.name 932 file_obj.close() 933 self.addCleanup(os.unlink, file_path) 934 935 with open(file_path, "rb") as f: 936 req = Request("http://example.com/", f, {}) 937 newreq = h.do_request_(req) 938 te = newreq.get_header('Transfer-encoding') 939 self.assertEqual(te, "chunked") 940 self.assertFalse(newreq.has_header('Content-length')) 941 942 with open(file_path, "rb") as f: 943 req = Request("http://example.com/", f, {"Content-Length": 30}) 944 newreq = h.do_request_(req) 945 self.assertEqual(int(newreq.get_header('Content-length')), 30) 946 self.assertFalse(newreq.has_header("Transfer-encoding")) 947 948 def test_http_body_fileobj(self): 949 # A file object - chunked encoding is used 950 # unless Content Length is already set. 951 # (Note that there are some subtle differences to a regular 952 # file, that is why we are testing both cases.) 953 954 h = urllib.request.AbstractHTTPHandler() 955 o = h.parent = MockOpener() 956 file_obj = io.BytesIO() 957 958 req = Request("http://example.com/", file_obj, {}) 959 newreq = h.do_request_(req) 960 self.assertEqual(newreq.get_header('Transfer-encoding'), 'chunked') 961 self.assertFalse(newreq.has_header('Content-length')) 962 963 headers = {"Content-Length": 30} 964 req = Request("http://example.com/", file_obj, headers) 965 newreq = h.do_request_(req) 966 self.assertEqual(int(newreq.get_header('Content-length')), 30) 967 self.assertFalse(newreq.has_header("Transfer-encoding")) 968 969 file_obj.close() 970 971 def test_http_body_pipe(self): 972 # A file reading from a pipe. 973 # A pipe cannot be seek'ed. There is no way to determine the 974 # content length up front. Thus, do_request_() should fall 975 # back to Transfer-encoding chunked. 976 977 h = urllib.request.AbstractHTTPHandler() 978 o = h.parent = MockOpener() 979 980 cmd = [sys.executable, "-c", r"pass"] 981 for headers in {}, {"Content-Length": 30}: 982 with subprocess.Popen(cmd, stdout=subprocess.PIPE) as proc: 983 req = Request("http://example.com/", proc.stdout, headers) 984 newreq = h.do_request_(req) 985 if not headers: 986 self.assertEqual(newreq.get_header('Content-length'), None) 987 self.assertEqual(newreq.get_header('Transfer-encoding'), 988 'chunked') 989 else: 990 self.assertEqual(int(newreq.get_header('Content-length')), 991 30) 992 993 def test_http_body_iterable(self): 994 # Generic iterable. There is no way to determine the content 995 # length up front. Fall back to Transfer-encoding chunked. 996 997 h = urllib.request.AbstractHTTPHandler() 998 o = h.parent = MockOpener() 999 1000 def iterable_body(): 1001 yield b"one" 1002 1003 for headers in {}, {"Content-Length": 11}: 1004 req = Request("http://example.com/", iterable_body(), headers) 1005 newreq = h.do_request_(req) 1006 if not headers: 1007 self.assertEqual(newreq.get_header('Content-length'), None) 1008 self.assertEqual(newreq.get_header('Transfer-encoding'), 1009 'chunked') 1010 else: 1011 self.assertEqual(int(newreq.get_header('Content-length')), 11) 1012 1013 def test_http_body_empty_seq(self): 1014 # Zero-length iterable body should be treated like any other iterable 1015 h = urllib.request.AbstractHTTPHandler() 1016 h.parent = MockOpener() 1017 req = h.do_request_(Request("http://example.com/", ())) 1018 self.assertEqual(req.get_header("Transfer-encoding"), "chunked") 1019 self.assertFalse(req.has_header("Content-length")) 1020 1021 def test_http_body_array(self): 1022 # array.array Iterable - Content Length is calculated 1023 1024 h = urllib.request.AbstractHTTPHandler() 1025 o = h.parent = MockOpener() 1026 1027 iterable_array = array.array("I",[1,2,3,4]) 1028 1029 for headers in {}, {"Content-Length": 16}: 1030 req = Request("http://example.com/", iterable_array, headers) 1031 newreq = h.do_request_(req) 1032 self.assertEqual(int(newreq.get_header('Content-length')),16) 1033 1034 def test_http_handler_debuglevel(self): 1035 o = OpenerDirector() 1036 h = MockHTTPSHandler(debuglevel=1) 1037 o.add_handler(h) 1038 o.open("https://www.example.com") 1039 self.assertEqual(h._debuglevel, 1) 1040 1041 def test_http_doubleslash(self): 1042 # Checks the presence of any unnecessary double slash in url does not 1043 # break anything. Previously, a double slash directly after the host 1044 # could cause incorrect parsing. 1045 h = urllib.request.AbstractHTTPHandler() 1046 h.parent = MockOpener() 1047 1048 data = b"" 1049 ds_urls = [ 1050 "http://example.com/foo/bar/baz.html", 1051 "http://example.com//foo/bar/baz.html", 1052 "http://example.com/foo//bar/baz.html", 1053 "http://example.com/foo/bar//baz.html" 1054 ] 1055 1056 for ds_url in ds_urls: 1057 ds_req = Request(ds_url, data) 1058 1059 # Check whether host is determined correctly if there is no proxy 1060 np_ds_req = h.do_request_(ds_req) 1061 self.assertEqual(np_ds_req.unredirected_hdrs["Host"], "example.com") 1062 1063 # Check whether host is determined correctly if there is a proxy 1064 ds_req.set_proxy("someproxy:3128", None) 1065 p_ds_req = h.do_request_(ds_req) 1066 self.assertEqual(p_ds_req.unredirected_hdrs["Host"], "example.com") 1067 1068 def test_full_url_setter(self): 1069 # Checks to ensure that components are set correctly after setting the 1070 # full_url of a Request object 1071 1072 urls = [ 1073 'http://example.com?foo=bar#baz', 1074 'http://example.com?foo=bar&spam=eggs#bash', 1075 'http://example.com', 1076 ] 1077 1078 # testing a reusable request instance, but the url parameter is 1079 # required, so just use a dummy one to instantiate 1080 r = Request('http://example.com') 1081 for url in urls: 1082 r.full_url = url 1083 parsed = urlparse(url) 1084 1085 self.assertEqual(r.get_full_url(), url) 1086 # full_url setter uses splittag to split into components. 1087 # splittag sets the fragment as None while urlparse sets it to '' 1088 self.assertEqual(r.fragment or '', parsed.fragment) 1089 self.assertEqual(urlparse(r.get_full_url()).query, parsed.query) 1090 1091 def test_full_url_deleter(self): 1092 r = Request('http://www.example.com') 1093 del r.full_url 1094 self.assertIsNone(r.full_url) 1095 self.assertIsNone(r.fragment) 1096 self.assertEqual(r.selector, '') 1097 1098 def test_fixpath_in_weirdurls(self): 1099 # Issue4493: urllib2 to supply '/' when to urls where path does not 1100 # start with'/' 1101 1102 h = urllib.request.AbstractHTTPHandler() 1103 h.parent = MockOpener() 1104 1105 weird_url = 'http://www.python.org?getspam' 1106 req = Request(weird_url) 1107 newreq = h.do_request_(req) 1108 self.assertEqual(newreq.host, 'www.python.org') 1109 self.assertEqual(newreq.selector, '/?getspam') 1110 1111 url_without_path = 'http://www.python.org' 1112 req = Request(url_without_path) 1113 newreq = h.do_request_(req) 1114 self.assertEqual(newreq.host, 'www.python.org') 1115 self.assertEqual(newreq.selector, '') 1116 1117 def test_errors(self): 1118 h = urllib.request.HTTPErrorProcessor() 1119 o = h.parent = MockOpener() 1120 1121 url = "http://example.com/" 1122 req = Request(url) 1123 # all 2xx are passed through 1124 r = MockResponse(200, "OK", {}, "", url) 1125 newr = h.http_response(req, r) 1126 self.assertIs(r, newr) 1127 self.assertFalse(hasattr(o, "proto")) # o.error not called 1128 r = MockResponse(202, "Accepted", {}, "", url) 1129 newr = h.http_response(req, r) 1130 self.assertIs(r, newr) 1131 self.assertFalse(hasattr(o, "proto")) # o.error not called 1132 r = MockResponse(206, "Partial content", {}, "", url) 1133 newr = h.http_response(req, r) 1134 self.assertIs(r, newr) 1135 self.assertFalse(hasattr(o, "proto")) # o.error not called 1136 # anything else calls o.error (and MockOpener returns None, here) 1137 r = MockResponse(502, "Bad gateway", {}, "", url) 1138 self.assertIsNone(h.http_response(req, r)) 1139 self.assertEqual(o.proto, "http") # o.error called 1140 self.assertEqual(o.args, (req, r, 502, "Bad gateway", {})) 1141 1142 def test_cookies(self): 1143 cj = MockCookieJar() 1144 h = urllib.request.HTTPCookieProcessor(cj) 1145 h.parent = MockOpener() 1146 1147 req = Request("http://example.com/") 1148 r = MockResponse(200, "OK", {}, "") 1149 newreq = h.http_request(req) 1150 self.assertIs(cj.ach_req, req) 1151 self.assertIs(cj.ach_req, newreq) 1152 self.assertEqual(req.origin_req_host, "example.com") 1153 self.assertFalse(req.unverifiable) 1154 newr = h.http_response(req, r) 1155 self.assertIs(cj.ec_req, req) 1156 self.assertIs(cj.ec_r, r) 1157 self.assertIs(r, newr) 1158 1159 def test_redirect(self): 1160 from_url = "http://example.com/a.html" 1161 to_url = "http://example.com/b.html" 1162 h = urllib.request.HTTPRedirectHandler() 1163 o = h.parent = MockOpener() 1164 1165 # ordinary redirect behaviour 1166 for code in 301, 302, 303, 307: 1167 for data in None, "blah\nblah\n": 1168 method = getattr(h, "http_error_%s" % code) 1169 req = Request(from_url, data) 1170 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 1171 req.add_header("Nonsense", "viking=withhold") 1172 if data is not None: 1173 req.add_header("Content-Length", str(len(data))) 1174 req.add_unredirected_header("Spam", "spam") 1175 try: 1176 method(req, MockFile(), code, "Blah", 1177 MockHeaders({"location": to_url})) 1178 except urllib.error.HTTPError: 1179 # 307 in response to POST requires user OK 1180 self.assertEqual(code, 307) 1181 self.assertIsNotNone(data) 1182 self.assertEqual(o.req.get_full_url(), to_url) 1183 try: 1184 self.assertEqual(o.req.get_method(), "GET") 1185 except AttributeError: 1186 self.assertFalse(o.req.data) 1187 1188 # now it's a GET, there should not be headers regarding content 1189 # (possibly dragged from before being a POST) 1190 headers = [x.lower() for x in o.req.headers] 1191 self.assertNotIn("content-length", headers) 1192 self.assertNotIn("content-type", headers) 1193 1194 self.assertEqual(o.req.headers["Nonsense"], 1195 "viking=withhold") 1196 self.assertNotIn("Spam", o.req.headers) 1197 self.assertNotIn("Spam", o.req.unredirected_hdrs) 1198 1199 # loop detection 1200 req = Request(from_url) 1201 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 1202 1203 def redirect(h, req, url=to_url): 1204 h.http_error_302(req, MockFile(), 302, "Blah", 1205 MockHeaders({"location": url})) 1206 # Note that the *original* request shares the same record of 1207 # redirections with the sub-requests caused by the redirections. 1208 1209 # detect infinite loop redirect of a URL to itself 1210 req = Request(from_url, origin_req_host="example.com") 1211 count = 0 1212 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 1213 try: 1214 while 1: 1215 redirect(h, req, "http://example.com/") 1216 count = count + 1 1217 except urllib.error.HTTPError: 1218 # don't stop until max_repeats, because cookies may introduce state 1219 self.assertEqual(count, urllib.request.HTTPRedirectHandler.max_repeats) 1220 1221 # detect endless non-repeating chain of redirects 1222 req = Request(from_url, origin_req_host="example.com") 1223 count = 0 1224 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 1225 try: 1226 while 1: 1227 redirect(h, req, "http://example.com/%d" % count) 1228 count = count + 1 1229 except urllib.error.HTTPError: 1230 self.assertEqual(count, 1231 urllib.request.HTTPRedirectHandler.max_redirections) 1232 1233 def test_invalid_redirect(self): 1234 from_url = "http://example.com/a.html" 1235 valid_schemes = ['http','https','ftp'] 1236 invalid_schemes = ['file','imap','ldap'] 1237 schemeless_url = "example.com/b.html" 1238 h = urllib.request.HTTPRedirectHandler() 1239 o = h.parent = MockOpener() 1240 req = Request(from_url) 1241 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 1242 1243 for scheme in invalid_schemes: 1244 invalid_url = scheme + '://' + schemeless_url 1245 self.assertRaises(urllib.error.HTTPError, h.http_error_302, 1246 req, MockFile(), 302, "Security Loophole", 1247 MockHeaders({"location": invalid_url})) 1248 1249 for scheme in valid_schemes: 1250 valid_url = scheme + '://' + schemeless_url 1251 h.http_error_302(req, MockFile(), 302, "That's fine", 1252 MockHeaders({"location": valid_url})) 1253 self.assertEqual(o.req.get_full_url(), valid_url) 1254 1255 def test_relative_redirect(self): 1256 from_url = "http://example.com/a.html" 1257 relative_url = "/b.html" 1258 h = urllib.request.HTTPRedirectHandler() 1259 o = h.parent = MockOpener() 1260 req = Request(from_url) 1261 req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT 1262 1263 valid_url = urllib.parse.urljoin(from_url,relative_url) 1264 h.http_error_302(req, MockFile(), 302, "That's fine", 1265 MockHeaders({"location": valid_url})) 1266 self.assertEqual(o.req.get_full_url(), valid_url) 1267 1268 def test_cookie_redirect(self): 1269 # cookies shouldn't leak into redirected requests 1270 from http.cookiejar import CookieJar 1271 from test.test_http_cookiejar import interact_netscape 1272 1273 cj = CookieJar() 1274 interact_netscape(cj, "http://www.example.com/", "spam=eggs") 1275 hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n") 1276 hdeh = urllib.request.HTTPDefaultErrorHandler() 1277 hrh = urllib.request.HTTPRedirectHandler() 1278 cp = urllib.request.HTTPCookieProcessor(cj) 1279 o = build_test_opener(hh, hdeh, hrh, cp) 1280 o.open("http://www.example.com/") 1281 self.assertFalse(hh.req.has_header("Cookie")) 1282 1283 def test_redirect_fragment(self): 1284 redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n' 1285 hh = MockHTTPHandler(302, 'Location: ' + redirected_url) 1286 hdeh = urllib.request.HTTPDefaultErrorHandler() 1287 hrh = urllib.request.HTTPRedirectHandler() 1288 o = build_test_opener(hh, hdeh, hrh) 1289 fp = o.open('http://www.example.com') 1290 self.assertEqual(fp.geturl(), redirected_url.strip()) 1291 1292 def test_redirect_no_path(self): 1293 # Issue 14132: Relative redirect strips original path 1294 1295 # clear _opener global variable 1296 self.addCleanup(urllib.request.urlcleanup) 1297 1298 real_class = http.client.HTTPConnection 1299 response1 = b"HTTP/1.1 302 Found\r\nLocation: ?query\r\n\r\n" 1300 http.client.HTTPConnection = test_urllib.fakehttp(response1) 1301 self.addCleanup(setattr, http.client, "HTTPConnection", real_class) 1302 urls = iter(("/path", "/path?query")) 1303 def request(conn, method, url, *pos, **kw): 1304 self.assertEqual(url, next(urls)) 1305 real_class.request(conn, method, url, *pos, **kw) 1306 # Change response for subsequent connection 1307 conn.__class__.fakedata = b"HTTP/1.1 200 OK\r\n\r\nHello!" 1308 http.client.HTTPConnection.request = request 1309 fp = urllib.request.urlopen("http://python.org/path") 1310 self.assertEqual(fp.geturl(), "http://python.org/path?query") 1311 1312 def test_redirect_encoding(self): 1313 # Some characters in the redirect target may need special handling, 1314 # but most ASCII characters should be treated as already encoded 1315 class Handler(urllib.request.HTTPHandler): 1316 def http_open(self, req): 1317 result = self.do_open(self.connection, req) 1318 self.last_buf = self.connection.buf 1319 # Set up a normal response for the next request 1320 self.connection = test_urllib.fakehttp( 1321 b'HTTP/1.1 200 OK\r\n' 1322 b'Content-Length: 3\r\n' 1323 b'\r\n' 1324 b'123' 1325 ) 1326 return result 1327 handler = Handler() 1328 opener = urllib.request.build_opener(handler) 1329 tests = ( 1330 (b'/p\xC3\xA5-dansk/', b'/p%C3%A5-dansk/'), 1331 (b'/spaced%20path/', b'/spaced%20path/'), 1332 (b'/spaced path/', b'/spaced%20path/'), 1333 (b'/?p\xC3\xA5-dansk', b'/?p%C3%A5-dansk'), 1334 ) 1335 for [location, result] in tests: 1336 with self.subTest(repr(location)): 1337 handler.connection = test_urllib.fakehttp( 1338 b'HTTP/1.1 302 Redirect\r\n' 1339 b'Location: ' + location + b'\r\n' 1340 b'\r\n' 1341 ) 1342 response = opener.open('http://example.com/') 1343 expected = b'GET ' + result + b' ' 1344 request = handler.last_buf 1345 self.assertTrue(request.startswith(expected), repr(request)) 1346 1347 def test_proxy(self): 1348 u = "proxy.example.com:3128" 1349 for d in dict(http=u), dict(HTTP=u): 1350 o = OpenerDirector() 1351 ph = urllib.request.ProxyHandler(d) 1352 o.add_handler(ph) 1353 meth_spec = [ 1354 [("http_open", "return response")] 1355 ] 1356 handlers = add_ordered_mock_handlers(o, meth_spec) 1357 1358 req = Request("http://acme.example.com/") 1359 self.assertEqual(req.host, "acme.example.com") 1360 o.open(req) 1361 self.assertEqual(req.host, u) 1362 self.assertEqual([(handlers[0], "http_open")], 1363 [tup[0:2] for tup in o.calls]) 1364 1365 def test_proxy_no_proxy(self): 1366 os.environ['no_proxy'] = 'python.org' 1367 o = OpenerDirector() 1368 ph = urllib.request.ProxyHandler(dict(http="proxy.example.com")) 1369 o.add_handler(ph) 1370 req = Request("http://www.perl.org/") 1371 self.assertEqual(req.host, "www.perl.org") 1372 o.open(req) 1373 self.assertEqual(req.host, "proxy.example.com") 1374 req = Request("http://www.python.org") 1375 self.assertEqual(req.host, "www.python.org") 1376 o.open(req) 1377 self.assertEqual(req.host, "www.python.org") 1378 del os.environ['no_proxy'] 1379 1380 def test_proxy_no_proxy_all(self): 1381 os.environ['no_proxy'] = '*' 1382 o = OpenerDirector() 1383 ph = urllib.request.ProxyHandler(dict(http="proxy.example.com")) 1384 o.add_handler(ph) 1385 req = Request("http://www.python.org") 1386 self.assertEqual(req.host, "www.python.org") 1387 o.open(req) 1388 self.assertEqual(req.host, "www.python.org") 1389 del os.environ['no_proxy'] 1390 1391 def test_proxy_https(self): 1392 o = OpenerDirector() 1393 ph = urllib.request.ProxyHandler(dict(https="proxy.example.com:3128")) 1394 o.add_handler(ph) 1395 meth_spec = [ 1396 [("https_open", "return response")] 1397 ] 1398 handlers = add_ordered_mock_handlers(o, meth_spec) 1399 1400 req = Request("https://www.example.com/") 1401 self.assertEqual(req.host, "www.example.com") 1402 o.open(req) 1403 self.assertEqual(req.host, "proxy.example.com:3128") 1404 self.assertEqual([(handlers[0], "https_open")], 1405 [tup[0:2] for tup in o.calls]) 1406 1407 def test_proxy_https_proxy_authorization(self): 1408 o = OpenerDirector() 1409 ph = urllib.request.ProxyHandler(dict(https='proxy.example.com:3128')) 1410 o.add_handler(ph) 1411 https_handler = MockHTTPSHandler() 1412 o.add_handler(https_handler) 1413 req = Request("https://www.example.com/") 1414 req.add_header("Proxy-Authorization", "FooBar") 1415 req.add_header("User-Agent", "Grail") 1416 self.assertEqual(req.host, "www.example.com") 1417 self.assertIsNone(req._tunnel_host) 1418 o.open(req) 1419 # Verify Proxy-Authorization gets tunneled to request. 1420 # httpsconn req_headers do not have the Proxy-Authorization header but 1421 # the req will have. 1422 self.assertNotIn(("Proxy-Authorization", "FooBar"), 1423 https_handler.httpconn.req_headers) 1424 self.assertIn(("User-Agent", "Grail"), 1425 https_handler.httpconn.req_headers) 1426 self.assertIsNotNone(req._tunnel_host) 1427 self.assertEqual(req.host, "proxy.example.com:3128") 1428 self.assertEqual(req.get_header("Proxy-authorization"), "FooBar") 1429 1430 @unittest.skipUnless(sys.platform == 'darwin', "only relevant for OSX") 1431 def test_osx_proxy_bypass(self): 1432 bypass = { 1433 'exclude_simple': False, 1434 'exceptions': ['foo.bar', '*.bar.com', '127.0.0.1', '10.10', 1435 '10.0/16'] 1436 } 1437 # Check hosts that should trigger the proxy bypass 1438 for host in ('foo.bar', 'www.bar.com', '127.0.0.1', '10.10.0.1', 1439 '10.0.0.1'): 1440 self.assertTrue(_proxy_bypass_macosx_sysconf(host, bypass), 1441 'expected bypass of %s to be True' % host) 1442 # Check hosts that should not trigger the proxy bypass 1443 for host in ('abc.foo.bar', 'bar.com', '127.0.0.2', '10.11.0.1', 1444 'notinbypass'): 1445 self.assertFalse(_proxy_bypass_macosx_sysconf(host, bypass), 1446 'expected bypass of %s to be False' % host) 1447 1448 # Check the exclude_simple flag 1449 bypass = {'exclude_simple': True, 'exceptions': []} 1450 self.assertTrue(_proxy_bypass_macosx_sysconf('test', bypass)) 1451 1452 # Check that invalid prefix lengths are ignored 1453 bypass = { 1454 'exclude_simple': False, 1455 'exceptions': [ '10.0.0.0/40', '172.19.10.0/24' ] 1456 } 1457 host = '172.19.10.5' 1458 self.assertTrue(_proxy_bypass_macosx_sysconf(host, bypass), 1459 'expected bypass of %s to be True' % host) 1460 host = '10.0.1.5' 1461 self.assertFalse(_proxy_bypass_macosx_sysconf(host, bypass), 1462 'expected bypass of %s to be False' % host) 1463 1464 def check_basic_auth(self, headers, realm): 1465 with self.subTest(realm=realm, headers=headers): 1466 opener = OpenerDirector() 1467 password_manager = MockPasswordManager() 1468 auth_handler = urllib.request.HTTPBasicAuthHandler(password_manager) 1469 body = '\r\n'.join(headers) + '\r\n\r\n' 1470 http_handler = MockHTTPHandler(401, body) 1471 opener.add_handler(auth_handler) 1472 opener.add_handler(http_handler) 1473 self._test_basic_auth(opener, auth_handler, "Authorization", 1474 realm, http_handler, password_manager, 1475 "http://acme.example.com/protected", 1476 "http://acme.example.com/protected") 1477 1478 def test_basic_auth(self): 1479 realm = "realm2@example.com" 1480 realm2 = "realm2@example.com" 1481 basic = f'Basic realm="{realm}"' 1482 basic2 = f'Basic realm="{realm2}"' 1483 other_no_realm = 'Otherscheme xxx' 1484 digest = (f'Digest realm="{realm2}", ' 1485 f'qop="auth, auth-int", ' 1486 f'nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", ' 1487 f'opaque="5ccc069c403ebaf9f0171e9517f40e41"') 1488 for realm_str in ( 1489 # test "quote" and 'quote' 1490 f'Basic realm="{realm}"', 1491 f"Basic realm='{realm}'", 1492 1493 # charset is ignored 1494 f'Basic realm="{realm}", charset="UTF-8"', 1495 1496 # Multiple challenges per header 1497 f'{basic}, {basic2}', 1498 f'{basic}, {other_no_realm}', 1499 f'{other_no_realm}, {basic}', 1500 f'{basic}, {digest}', 1501 f'{digest}, {basic}', 1502 ): 1503 headers = [f'WWW-Authenticate: {realm_str}'] 1504 self.check_basic_auth(headers, realm) 1505 1506 # no quote: expect a warning 1507 with warnings_helper.check_warnings(("Basic Auth Realm was unquoted", 1508 UserWarning)): 1509 headers = [f'WWW-Authenticate: Basic realm={realm}'] 1510 self.check_basic_auth(headers, realm) 1511 1512 # Multiple headers: one challenge per header. 1513 # Use the first Basic realm. 1514 for challenges in ( 1515 [basic, basic2], 1516 [basic, digest], 1517 [digest, basic], 1518 ): 1519 headers = [f'WWW-Authenticate: {challenge}' 1520 for challenge in challenges] 1521 self.check_basic_auth(headers, realm) 1522 1523 def test_proxy_basic_auth(self): 1524 opener = OpenerDirector() 1525 ph = urllib.request.ProxyHandler(dict(http="proxy.example.com:3128")) 1526 opener.add_handler(ph) 1527 password_manager = MockPasswordManager() 1528 auth_handler = urllib.request.ProxyBasicAuthHandler(password_manager) 1529 realm = "ACME Networks" 1530 http_handler = MockHTTPHandler( 1531 407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm) 1532 opener.add_handler(auth_handler) 1533 opener.add_handler(http_handler) 1534 self._test_basic_auth(opener, auth_handler, "Proxy-authorization", 1535 realm, http_handler, password_manager, 1536 "http://acme.example.com:3128/protected", 1537 "proxy.example.com:3128", 1538 ) 1539 1540 def test_basic_and_digest_auth_handlers(self): 1541 # HTTPDigestAuthHandler raised an exception if it couldn't handle a 40* 1542 # response (http://python.org/sf/1479302), where it should instead 1543 # return None to allow another handler (especially 1544 # HTTPBasicAuthHandler) to handle the response. 1545 1546 # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must 1547 # try digest first (since it's the strongest auth scheme), so we record 1548 # order of calls here to check digest comes first: 1549 class RecordingOpenerDirector(OpenerDirector): 1550 def __init__(self): 1551 OpenerDirector.__init__(self) 1552 self.recorded = [] 1553 1554 def record(self, info): 1555 self.recorded.append(info) 1556 1557 class TestDigestAuthHandler(urllib.request.HTTPDigestAuthHandler): 1558 def http_error_401(self, *args, **kwds): 1559 self.parent.record("digest") 1560 urllib.request.HTTPDigestAuthHandler.http_error_401(self, 1561 *args, **kwds) 1562 1563 class TestBasicAuthHandler(urllib.request.HTTPBasicAuthHandler): 1564 def http_error_401(self, *args, **kwds): 1565 self.parent.record("basic") 1566 urllib.request.HTTPBasicAuthHandler.http_error_401(self, 1567 *args, **kwds) 1568 1569 opener = RecordingOpenerDirector() 1570 password_manager = MockPasswordManager() 1571 digest_handler = TestDigestAuthHandler(password_manager) 1572 basic_handler = TestBasicAuthHandler(password_manager) 1573 realm = "ACME Networks" 1574 http_handler = MockHTTPHandler( 1575 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm) 1576 opener.add_handler(basic_handler) 1577 opener.add_handler(digest_handler) 1578 opener.add_handler(http_handler) 1579 1580 # check basic auth isn't blocked by digest handler failing 1581 self._test_basic_auth(opener, basic_handler, "Authorization", 1582 realm, http_handler, password_manager, 1583 "http://acme.example.com/protected", 1584 "http://acme.example.com/protected", 1585 ) 1586 # check digest was tried before basic (twice, because 1587 # _test_basic_auth called .open() twice) 1588 self.assertEqual(opener.recorded, ["digest", "basic"]*2) 1589 1590 def test_unsupported_auth_digest_handler(self): 1591 opener = OpenerDirector() 1592 # While using DigestAuthHandler 1593 digest_auth_handler = urllib.request.HTTPDigestAuthHandler(None) 1594 http_handler = MockHTTPHandler( 1595 401, 'WWW-Authenticate: Kerberos\r\n\r\n') 1596 opener.add_handler(digest_auth_handler) 1597 opener.add_handler(http_handler) 1598 self.assertRaises(ValueError, opener.open, "http://www.example.com") 1599 1600 def test_unsupported_auth_basic_handler(self): 1601 # While using BasicAuthHandler 1602 opener = OpenerDirector() 1603 basic_auth_handler = urllib.request.HTTPBasicAuthHandler(None) 1604 http_handler = MockHTTPHandler( 1605 401, 'WWW-Authenticate: NTLM\r\n\r\n') 1606 opener.add_handler(basic_auth_handler) 1607 opener.add_handler(http_handler) 1608 self.assertRaises(ValueError, opener.open, "http://www.example.com") 1609 1610 def _test_basic_auth(self, opener, auth_handler, auth_header, 1611 realm, http_handler, password_manager, 1612 request_url, protected_url): 1613 import base64 1614 user, password = "wile", "coyote" 1615 1616 # .add_password() fed through to password manager 1617 auth_handler.add_password(realm, request_url, user, password) 1618 self.assertEqual(realm, password_manager.realm) 1619 self.assertEqual(request_url, password_manager.url) 1620 self.assertEqual(user, password_manager.user) 1621 self.assertEqual(password, password_manager.password) 1622 1623 opener.open(request_url) 1624 1625 # should have asked the password manager for the username/password 1626 self.assertEqual(password_manager.target_realm, realm) 1627 self.assertEqual(password_manager.target_url, protected_url) 1628 1629 # expect one request without authorization, then one with 1630 self.assertEqual(len(http_handler.requests), 2) 1631 self.assertFalse(http_handler.requests[0].has_header(auth_header)) 1632 userpass = bytes('%s:%s' % (user, password), "ascii") 1633 auth_hdr_value = ('Basic ' + 1634 base64.encodebytes(userpass).strip().decode()) 1635 self.assertEqual(http_handler.requests[1].get_header(auth_header), 1636 auth_hdr_value) 1637 self.assertEqual(http_handler.requests[1].unredirected_hdrs[auth_header], 1638 auth_hdr_value) 1639 # if the password manager can't find a password, the handler won't 1640 # handle the HTTP auth error 1641 password_manager.user = password_manager.password = None 1642 http_handler.reset() 1643 opener.open(request_url) 1644 self.assertEqual(len(http_handler.requests), 1) 1645 self.assertFalse(http_handler.requests[0].has_header(auth_header)) 1646 1647 def test_basic_prior_auth_auto_send(self): 1648 # Assume already authenticated if is_authenticated=True 1649 # for APIs like Github that don't return 401 1650 1651 user, password = "wile", "coyote" 1652 request_url = "http://acme.example.com/protected" 1653 1654 http_handler = MockHTTPHandlerCheckAuth(200) 1655 1656 pwd_manager = HTTPPasswordMgrWithPriorAuth() 1657 auth_prior_handler = HTTPBasicAuthHandler(pwd_manager) 1658 auth_prior_handler.add_password( 1659 None, request_url, user, password, is_authenticated=True) 1660 1661 is_auth = pwd_manager.is_authenticated(request_url) 1662 self.assertTrue(is_auth) 1663 1664 opener = OpenerDirector() 1665 opener.add_handler(auth_prior_handler) 1666 opener.add_handler(http_handler) 1667 1668 opener.open(request_url) 1669 1670 # expect request to be sent with auth header 1671 self.assertTrue(http_handler.has_auth_header) 1672 1673 def test_basic_prior_auth_send_after_first_success(self): 1674 # Auto send auth header after authentication is successful once 1675 1676 user, password = 'wile', 'coyote' 1677 request_url = 'http://acme.example.com/protected' 1678 realm = 'ACME' 1679 1680 pwd_manager = HTTPPasswordMgrWithPriorAuth() 1681 auth_prior_handler = HTTPBasicAuthHandler(pwd_manager) 1682 auth_prior_handler.add_password(realm, request_url, user, password) 1683 1684 is_auth = pwd_manager.is_authenticated(request_url) 1685 self.assertFalse(is_auth) 1686 1687 opener = OpenerDirector() 1688 opener.add_handler(auth_prior_handler) 1689 1690 http_handler = MockHTTPHandler( 1691 401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % None) 1692 opener.add_handler(http_handler) 1693 1694 opener.open(request_url) 1695 1696 is_auth = pwd_manager.is_authenticated(request_url) 1697 self.assertTrue(is_auth) 1698 1699 http_handler = MockHTTPHandlerCheckAuth(200) 1700 self.assertFalse(http_handler.has_auth_header) 1701 1702 opener = OpenerDirector() 1703 opener.add_handler(auth_prior_handler) 1704 opener.add_handler(http_handler) 1705 1706 # After getting 200 from MockHTTPHandler 1707 # Next request sends header in the first request 1708 opener.open(request_url) 1709 1710 # expect request to be sent with auth header 1711 self.assertTrue(http_handler.has_auth_header) 1712 1713 def test_http_closed(self): 1714 """Test the connection is cleaned up when the response is closed""" 1715 for (transfer, data) in ( 1716 ("Connection: close", b"data"), 1717 ("Transfer-Encoding: chunked", b"4\r\ndata\r\n0\r\n\r\n"), 1718 ("Content-Length: 4", b"data"), 1719 ): 1720 header = "HTTP/1.1 200 OK\r\n{}\r\n\r\n".format(transfer) 1721 conn = test_urllib.fakehttp(header.encode() + data) 1722 handler = urllib.request.AbstractHTTPHandler() 1723 req = Request("http://dummy/") 1724 req.timeout = None 1725 with handler.do_open(conn, req) as resp: 1726 resp.read() 1727 self.assertTrue(conn.fakesock.closed, 1728 "Connection not closed with {!r}".format(transfer)) 1729 1730 def test_invalid_closed(self): 1731 """Test the connection is cleaned up after an invalid response""" 1732 conn = test_urllib.fakehttp(b"") 1733 handler = urllib.request.AbstractHTTPHandler() 1734 req = Request("http://dummy/") 1735 req.timeout = None 1736 with self.assertRaises(http.client.BadStatusLine): 1737 handler.do_open(conn, req) 1738 self.assertTrue(conn.fakesock.closed, "Connection not closed") 1739 1740 1741class MiscTests(unittest.TestCase): 1742 1743 def opener_has_handler(self, opener, handler_class): 1744 self.assertTrue(any(h.__class__ == handler_class 1745 for h in opener.handlers)) 1746 1747 def test_build_opener(self): 1748 class MyHTTPHandler(urllib.request.HTTPHandler): 1749 pass 1750 1751 class FooHandler(urllib.request.BaseHandler): 1752 def foo_open(self): 1753 pass 1754 1755 class BarHandler(urllib.request.BaseHandler): 1756 def bar_open(self): 1757 pass 1758 1759 build_opener = urllib.request.build_opener 1760 1761 o = build_opener(FooHandler, BarHandler) 1762 self.opener_has_handler(o, FooHandler) 1763 self.opener_has_handler(o, BarHandler) 1764 1765 # can take a mix of classes and instances 1766 o = build_opener(FooHandler, BarHandler()) 1767 self.opener_has_handler(o, FooHandler) 1768 self.opener_has_handler(o, BarHandler) 1769 1770 # subclasses of default handlers override default handlers 1771 o = build_opener(MyHTTPHandler) 1772 self.opener_has_handler(o, MyHTTPHandler) 1773 1774 # a particular case of overriding: default handlers can be passed 1775 # in explicitly 1776 o = build_opener() 1777 self.opener_has_handler(o, urllib.request.HTTPHandler) 1778 o = build_opener(urllib.request.HTTPHandler) 1779 self.opener_has_handler(o, urllib.request.HTTPHandler) 1780 o = build_opener(urllib.request.HTTPHandler()) 1781 self.opener_has_handler(o, urllib.request.HTTPHandler) 1782 1783 # Issue2670: multiple handlers sharing the same base class 1784 class MyOtherHTTPHandler(urllib.request.HTTPHandler): 1785 pass 1786 1787 o = build_opener(MyHTTPHandler, MyOtherHTTPHandler) 1788 self.opener_has_handler(o, MyHTTPHandler) 1789 self.opener_has_handler(o, MyOtherHTTPHandler) 1790 1791 @unittest.skipUnless(support.is_resource_enabled('network'), 1792 'test requires network access') 1793 def test_issue16464(self): 1794 with socket_helper.transient_internet("http://www.example.com/"): 1795 opener = urllib.request.build_opener() 1796 request = urllib.request.Request("http://www.example.com/") 1797 self.assertEqual(None, request.data) 1798 1799 opener.open(request, "1".encode("us-ascii")) 1800 self.assertEqual(b"1", request.data) 1801 self.assertEqual("1", request.get_header("Content-length")) 1802 1803 opener.open(request, "1234567890".encode("us-ascii")) 1804 self.assertEqual(b"1234567890", request.data) 1805 self.assertEqual("10", request.get_header("Content-length")) 1806 1807 def test_HTTPError_interface(self): 1808 """ 1809 Issue 13211 reveals that HTTPError didn't implement the URLError 1810 interface even though HTTPError is a subclass of URLError. 1811 """ 1812 msg = 'something bad happened' 1813 url = code = fp = None 1814 hdrs = 'Content-Length: 42' 1815 err = urllib.error.HTTPError(url, code, msg, hdrs, fp) 1816 self.assertTrue(hasattr(err, 'reason')) 1817 self.assertEqual(err.reason, 'something bad happened') 1818 self.assertTrue(hasattr(err, 'headers')) 1819 self.assertEqual(err.headers, 'Content-Length: 42') 1820 expected_errmsg = 'HTTP Error %s: %s' % (err.code, err.msg) 1821 self.assertEqual(str(err), expected_errmsg) 1822 expected_errmsg = '<HTTPError %s: %r>' % (err.code, err.msg) 1823 self.assertEqual(repr(err), expected_errmsg) 1824 1825 def test_parse_proxy(self): 1826 parse_proxy_test_cases = [ 1827 ('proxy.example.com', 1828 (None, None, None, 'proxy.example.com')), 1829 ('proxy.example.com:3128', 1830 (None, None, None, 'proxy.example.com:3128')), 1831 ('proxy.example.com', (None, None, None, 'proxy.example.com')), 1832 ('proxy.example.com:3128', 1833 (None, None, None, 'proxy.example.com:3128')), 1834 # The authority component may optionally include userinfo 1835 # (assumed to be # username:password): 1836 ('joe:password@proxy.example.com', 1837 (None, 'joe', 'password', 'proxy.example.com')), 1838 ('joe:password@proxy.example.com:3128', 1839 (None, 'joe', 'password', 'proxy.example.com:3128')), 1840 #Examples with URLS 1841 ('http://proxy.example.com/', 1842 ('http', None, None, 'proxy.example.com')), 1843 ('http://proxy.example.com:3128/', 1844 ('http', None, None, 'proxy.example.com:3128')), 1845 ('http://joe:password@proxy.example.com/', 1846 ('http', 'joe', 'password', 'proxy.example.com')), 1847 ('http://joe:password@proxy.example.com:3128', 1848 ('http', 'joe', 'password', 'proxy.example.com:3128')), 1849 # Everything after the authority is ignored 1850 ('ftp://joe:password@proxy.example.com/rubbish:3128', 1851 ('ftp', 'joe', 'password', 'proxy.example.com')), 1852 # Test for no trailing '/' case 1853 ('http://joe:password@proxy.example.com', 1854 ('http', 'joe', 'password', 'proxy.example.com')), 1855 # Testcases with '/' character in username, password 1856 ('http://user/name:password@localhost:22', 1857 ('http', 'user/name', 'password', 'localhost:22')), 1858 ('http://username:pass/word@localhost:22', 1859 ('http', 'username', 'pass/word', 'localhost:22')), 1860 ('http://user/name:pass/word@localhost:22', 1861 ('http', 'user/name', 'pass/word', 'localhost:22')), 1862 ] 1863 1864 1865 for tc, expected in parse_proxy_test_cases: 1866 self.assertEqual(_parse_proxy(tc), expected) 1867 1868 self.assertRaises(ValueError, _parse_proxy, 'file:/ftp.example.com'), 1869 1870 def test_unsupported_algorithm(self): 1871 handler = AbstractDigestAuthHandler() 1872 with self.assertRaises(ValueError) as exc: 1873 handler.get_algorithm_impls('invalid') 1874 self.assertEqual( 1875 str(exc.exception), 1876 "Unsupported digest authentication algorithm 'invalid'" 1877 ) 1878 1879 1880class RequestTests(unittest.TestCase): 1881 class PutRequest(Request): 1882 method = 'PUT' 1883 1884 def setUp(self): 1885 self.get = Request("http://www.python.org/~jeremy/") 1886 self.post = Request("http://www.python.org/~jeremy/", 1887 "data", 1888 headers={"X-Test": "test"}) 1889 self.head = Request("http://www.python.org/~jeremy/", method='HEAD') 1890 self.put = self.PutRequest("http://www.python.org/~jeremy/") 1891 self.force_post = self.PutRequest("http://www.python.org/~jeremy/", 1892 method="POST") 1893 1894 def test_method(self): 1895 self.assertEqual("POST", self.post.get_method()) 1896 self.assertEqual("GET", self.get.get_method()) 1897 self.assertEqual("HEAD", self.head.get_method()) 1898 self.assertEqual("PUT", self.put.get_method()) 1899 self.assertEqual("POST", self.force_post.get_method()) 1900 1901 def test_data(self): 1902 self.assertFalse(self.get.data) 1903 self.assertEqual("GET", self.get.get_method()) 1904 self.get.data = "spam" 1905 self.assertTrue(self.get.data) 1906 self.assertEqual("POST", self.get.get_method()) 1907 1908 # issue 16464 1909 # if we change data we need to remove content-length header 1910 # (cause it's most probably calculated for previous value) 1911 def test_setting_data_should_remove_content_length(self): 1912 self.assertNotIn("Content-length", self.get.unredirected_hdrs) 1913 self.get.add_unredirected_header("Content-length", 42) 1914 self.assertEqual(42, self.get.unredirected_hdrs["Content-length"]) 1915 self.get.data = "spam" 1916 self.assertNotIn("Content-length", self.get.unredirected_hdrs) 1917 1918 # issue 17485 same for deleting data. 1919 def test_deleting_data_should_remove_content_length(self): 1920 self.assertNotIn("Content-length", self.get.unredirected_hdrs) 1921 self.get.data = 'foo' 1922 self.get.add_unredirected_header("Content-length", 3) 1923 self.assertEqual(3, self.get.unredirected_hdrs["Content-length"]) 1924 del self.get.data 1925 self.assertNotIn("Content-length", self.get.unredirected_hdrs) 1926 1927 def test_get_full_url(self): 1928 self.assertEqual("http://www.python.org/~jeremy/", 1929 self.get.get_full_url()) 1930 1931 def test_selector(self): 1932 self.assertEqual("/~jeremy/", self.get.selector) 1933 req = Request("http://www.python.org/") 1934 self.assertEqual("/", req.selector) 1935 1936 def test_get_type(self): 1937 self.assertEqual("http", self.get.type) 1938 1939 def test_get_host(self): 1940 self.assertEqual("www.python.org", self.get.host) 1941 1942 def test_get_host_unquote(self): 1943 req = Request("http://www.%70ython.org/") 1944 self.assertEqual("www.python.org", req.host) 1945 1946 def test_proxy(self): 1947 self.assertFalse(self.get.has_proxy()) 1948 self.get.set_proxy("www.perl.org", "http") 1949 self.assertTrue(self.get.has_proxy()) 1950 self.assertEqual("www.python.org", self.get.origin_req_host) 1951 self.assertEqual("www.perl.org", self.get.host) 1952 1953 def test_wrapped_url(self): 1954 req = Request("<URL:http://www.python.org>") 1955 self.assertEqual("www.python.org", req.host) 1956 1957 def test_url_fragment(self): 1958 req = Request("http://www.python.org/?qs=query#fragment=true") 1959 self.assertEqual("/?qs=query", req.selector) 1960 req = Request("http://www.python.org/#fun=true") 1961 self.assertEqual("/", req.selector) 1962 1963 # Issue 11703: geturl() omits fragment in the original URL. 1964 url = 'http://docs.python.org/library/urllib2.html#OK' 1965 req = Request(url) 1966 self.assertEqual(req.get_full_url(), url) 1967 1968 def test_url_fullurl_get_full_url(self): 1969 urls = ['http://docs.python.org', 1970 'http://docs.python.org/library/urllib2.html#OK', 1971 'http://www.python.org/?qs=query#fragment=true'] 1972 for url in urls: 1973 req = Request(url) 1974 self.assertEqual(req.get_full_url(), req.full_url) 1975 1976 1977if __name__ == "__main__": 1978 unittest.main() 1979