• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2from test import test_support
3from test import test_urllib
4
5import os
6import socket
7import StringIO
8
9import urllib2
10from urllib2 import Request, OpenerDirector, AbstractDigestAuthHandler
11import httplib
12
13try:
14    import ssl
15except ImportError:
16    ssl = None
17
18# XXX
19# Request
20# CacheFTPHandler (hard to write)
21# parse_keqv_list, parse_http_list, HTTPDigestAuthHandler
22
23class TrivialTests(unittest.TestCase):
24    def test_trivial(self):
25        # A couple trivial tests
26
27        self.assertRaises(ValueError, urllib2.urlopen, 'bogus url')
28
29        # XXX Name hacking to get this to work on Windows.
30        fname = os.path.abspath(urllib2.__file__).replace(os.sep, '/')
31
32        # And more hacking to get it to work on MacOS. This assumes
33        # urllib.pathname2url works, unfortunately...
34        if os.name == 'riscos':
35            import string
36            fname = os.expand(fname)
37            fname = fname.translate(string.maketrans("/.", "./"))
38
39        if os.name == 'nt':
40            file_url = "file:///%s" % fname
41        else:
42            file_url = "file://%s" % fname
43
44        f = urllib2.urlopen(file_url)
45
46        buf = f.read()
47        f.close()
48
49    def test_parse_http_list(self):
50        tests = [('a,b,c', ['a', 'b', 'c']),
51                 ('path"o,l"og"i"cal, example', ['path"o,l"og"i"cal', 'example']),
52                 ('a, b, "c", "d", "e,f", g, h', ['a', 'b', '"c"', '"d"', '"e,f"', 'g', 'h']),
53                 ('a="b\\"c", d="e\\,f", g="h\\\\i"', ['a="b"c"', 'd="e,f"', 'g="h\\i"'])]
54        for string, list in tests:
55            self.assertEqual(urllib2.parse_http_list(string), list)
56
57    @unittest.skipUnless(ssl, "ssl module required")
58    def test_cafile_and_context(self):
59        context = ssl.create_default_context()
60        with self.assertRaises(ValueError):
61            urllib2.urlopen(
62                "https://localhost", cafile="/nonexistent/path", context=context
63            )
64
65
66def test_request_headers_dict():
67    """
68    The Request.headers dictionary is not a documented interface.  It should
69    stay that way, because the complete set of headers are only accessible
70    through the .get_header(), .has_header(), .header_items() interface.
71    However, .headers pre-dates those methods, and so real code will be using
72    the dictionary.
73
74    The introduction in 2.4 of those methods was a mistake for the same reason:
75    code that previously saw all (urllib2 user)-provided headers in .headers
76    now sees only a subset (and the function interface is ugly and incomplete).
77    A better change would have been to replace .headers dict with a dict
78    subclass (or UserDict.DictMixin instance?)  that preserved the .headers
79    interface and also provided access to the "unredirected" headers.  It's
80    probably too late to fix that, though.
81
82
83    Check .capitalize() case normalization:
84
85    >>> url = "http://example.com"
86    >>> Request(url, headers={"Spam-eggs": "blah"}).headers["Spam-eggs"]
87    'blah'
88    >>> Request(url, headers={"spam-EggS": "blah"}).headers["Spam-eggs"]
89    'blah'
90
91    Currently, Request(url, "Spam-eggs").headers["Spam-Eggs"] raises KeyError,
92    but that could be changed in future.
93
94    """
95
96def test_request_headers_methods():
97    """
98    Note the case normalization of header names here, to .capitalize()-case.
99    This should be preserved for backwards-compatibility.  (In the HTTP case,
100    normalization to .title()-case is done by urllib2 before sending headers to
101    httplib).
102
103    >>> url = "http://example.com"
104    >>> r = Request(url, headers={"Spam-eggs": "blah"})
105    >>> r.has_header("Spam-eggs")
106    True
107    >>> r.header_items()
108    [('Spam-eggs', 'blah')]
109    >>> r.add_header("Foo-Bar", "baz")
110    >>> items = r.header_items()
111    >>> items.sort()
112    >>> items
113    [('Foo-bar', 'baz'), ('Spam-eggs', 'blah')]
114
115    Note that e.g. r.has_header("spam-EggS") is currently False, and
116    r.get_header("spam-EggS") returns None, but that could be changed in
117    future.
118
119    >>> r.has_header("Not-there")
120    False
121    >>> print r.get_header("Not-there")
122    None
123    >>> r.get_header("Not-there", "default")
124    'default'
125
126    """
127
128
129def test_password_manager(self):
130    """
131    >>> mgr = urllib2.HTTPPasswordMgr()
132    >>> add = mgr.add_password
133    >>> add("Some Realm", "http://example.com/", "joe", "password")
134    >>> add("Some Realm", "http://example.com/ni", "ni", "ni")
135    >>> add("c", "http://example.com/foo", "foo", "ni")
136    >>> add("c", "http://example.com/bar", "bar", "nini")
137    >>> add("b", "http://example.com/", "first", "blah")
138    >>> add("b", "http://example.com/", "second", "spam")
139    >>> add("a", "http://example.com", "1", "a")
140    >>> add("Some Realm", "http://c.example.com:3128", "3", "c")
141    >>> add("Some Realm", "d.example.com", "4", "d")
142    >>> add("Some Realm", "e.example.com:3128", "5", "e")
143
144    >>> mgr.find_user_password("Some Realm", "example.com")
145    ('joe', 'password')
146    >>> mgr.find_user_password("Some Realm", "http://example.com")
147    ('joe', 'password')
148    >>> mgr.find_user_password("Some Realm", "http://example.com/")
149    ('joe', 'password')
150    >>> mgr.find_user_password("Some Realm", "http://example.com/spam")
151    ('joe', 'password')
152    >>> mgr.find_user_password("Some Realm", "http://example.com/spam/spam")
153    ('joe', 'password')
154    >>> mgr.find_user_password("c", "http://example.com/foo")
155    ('foo', 'ni')
156    >>> mgr.find_user_password("c", "http://example.com/bar")
157    ('bar', 'nini')
158
159    Actually, this is really undefined ATM
160##     Currently, we use the highest-level path where more than one match:
161
162##     >>> mgr.find_user_password("Some Realm", "http://example.com/ni")
163##     ('joe', 'password')
164
165    Use latest add_password() in case of conflict:
166
167    >>> mgr.find_user_password("b", "http://example.com/")
168    ('second', 'spam')
169
170    No special relationship between a.example.com and example.com:
171
172    >>> mgr.find_user_password("a", "http://example.com/")
173    ('1', 'a')
174    >>> mgr.find_user_password("a", "http://a.example.com/")
175    (None, None)
176
177    Ports:
178
179    >>> mgr.find_user_password("Some Realm", "c.example.com")
180    (None, None)
181    >>> mgr.find_user_password("Some Realm", "c.example.com:3128")
182    ('3', 'c')
183    >>> mgr.find_user_password("Some Realm", "http://c.example.com:3128")
184    ('3', 'c')
185    >>> mgr.find_user_password("Some Realm", "d.example.com")
186    ('4', 'd')
187    >>> mgr.find_user_password("Some Realm", "e.example.com:3128")
188    ('5', 'e')
189
190    """
191    pass
192
193
194def test_password_manager_default_port(self):
195    """
196    >>> mgr = urllib2.HTTPPasswordMgr()
197    >>> add = mgr.add_password
198
199    The point to note here is that we can't guess the default port if there's
200    no scheme.  This applies to both add_password and find_user_password.
201
202    >>> add("f", "http://g.example.com:80", "10", "j")
203    >>> add("g", "http://h.example.com", "11", "k")
204    >>> add("h", "i.example.com:80", "12", "l")
205    >>> add("i", "j.example.com", "13", "m")
206    >>> mgr.find_user_password("f", "g.example.com:100")
207    (None, None)
208    >>> mgr.find_user_password("f", "g.example.com:80")
209    ('10', 'j')
210    >>> mgr.find_user_password("f", "g.example.com")
211    (None, None)
212    >>> mgr.find_user_password("f", "http://g.example.com:100")
213    (None, None)
214    >>> mgr.find_user_password("f", "http://g.example.com:80")
215    ('10', 'j')
216    >>> mgr.find_user_password("f", "http://g.example.com")
217    ('10', 'j')
218    >>> mgr.find_user_password("g", "h.example.com")
219    ('11', 'k')
220    >>> mgr.find_user_password("g", "h.example.com:80")
221    ('11', 'k')
222    >>> mgr.find_user_password("g", "http://h.example.com:80")
223    ('11', 'k')
224    >>> mgr.find_user_password("h", "i.example.com")
225    (None, None)
226    >>> mgr.find_user_password("h", "i.example.com:80")
227    ('12', 'l')
228    >>> mgr.find_user_password("h", "http://i.example.com:80")
229    ('12', 'l')
230    >>> mgr.find_user_password("i", "j.example.com")
231    ('13', 'm')
232    >>> mgr.find_user_password("i", "j.example.com:80")
233    (None, None)
234    >>> mgr.find_user_password("i", "http://j.example.com")
235    ('13', 'm')
236    >>> mgr.find_user_password("i", "http://j.example.com:80")
237    (None, None)
238
239    """
240
241class MockOpener:
242    addheaders = []
243    def open(self, req, data=None,timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
244        self.req, self.data, self.timeout  = req, data, timeout
245    def error(self, proto, *args):
246        self.proto, self.args = proto, args
247
248class MockFile:
249    def read(self, count=None): pass
250    def readline(self, count=None): pass
251    def close(self): pass
252
253class MockHeaders(dict):
254    def getheaders(self, name):
255        return self.values()
256
257class MockResponse(StringIO.StringIO):
258    def __init__(self, code, msg, headers, data, url=None):
259        StringIO.StringIO.__init__(self, data)
260        self.code, self.msg, self.headers, self.url = code, msg, headers, url
261    def info(self):
262        return self.headers
263    def geturl(self):
264        return self.url
265
266class MockCookieJar:
267    def add_cookie_header(self, request):
268        self.ach_req = request
269    def extract_cookies(self, response, request):
270        self.ec_req, self.ec_r = request, response
271
272class FakeMethod:
273    def __init__(self, meth_name, action, handle):
274        self.meth_name = meth_name
275        self.handle = handle
276        self.action = action
277    def __call__(self, *args):
278        return self.handle(self.meth_name, self.action, *args)
279
280class MockHTTPResponse:
281    def __init__(self, fp, msg, status, reason):
282        self.fp = fp
283        self.msg = msg
284        self.status = status
285        self.reason = reason
286    def read(self):
287        return ''
288
289class MockHTTPClass:
290    def __init__(self):
291        self.req_headers = []
292        self.data = None
293        self.raise_on_endheaders = False
294        self._tunnel_headers = {}
295
296    def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
297        self.host = host
298        self.timeout = timeout
299        return self
300
301    def set_debuglevel(self, level):
302        self.level = level
303
304    def set_tunnel(self, host, port=None, headers=None):
305        self._tunnel_host = host
306        self._tunnel_port = port
307        if headers:
308            self._tunnel_headers = headers
309        else:
310            self._tunnel_headers.clear()
311
312    def request(self, method, url, body=None, headers=None):
313        self.method = method
314        self.selector = url
315        if headers is not None:
316            self.req_headers += headers.items()
317        self.req_headers.sort()
318        if body:
319            self.data = body
320        if self.raise_on_endheaders:
321            import socket
322            raise socket.error()
323
324    def getresponse(self):
325        return MockHTTPResponse(MockFile(), {}, 200, "OK")
326
327    def close(self):
328        pass
329
330class MockHandler:
331    # useful for testing handler machinery
332    # see add_ordered_mock_handlers() docstring
333    handler_order = 500
334    def __init__(self, methods):
335        self._define_methods(methods)
336    def _define_methods(self, methods):
337        for spec in methods:
338            if len(spec) == 2: name, action = spec
339            else: name, action = spec, None
340            meth = FakeMethod(name, action, self.handle)
341            setattr(self.__class__, name, meth)
342    def handle(self, fn_name, action, *args, **kwds):
343        self.parent.calls.append((self, fn_name, args, kwds))
344        if action is None:
345            return None
346        elif action == "return self":
347            return self
348        elif action == "return response":
349            res = MockResponse(200, "OK", {}, "")
350            return res
351        elif action == "return request":
352            return Request("http://blah/")
353        elif action.startswith("error"):
354            code = action[action.rfind(" ")+1:]
355            try:
356                code = int(code)
357            except ValueError:
358                pass
359            res = MockResponse(200, "OK", {}, "")
360            return self.parent.error("http", args[0], res, code, "", {})
361        elif action == "raise":
362            raise urllib2.URLError("blah")
363        assert False
364    def close(self): pass
365    def add_parent(self, parent):
366        self.parent = parent
367        self.parent.calls = []
368    def __lt__(self, other):
369        if not hasattr(other, "handler_order"):
370            # No handler_order, leave in original order.  Yuck.
371            return True
372        return self.handler_order < other.handler_order
373
374def add_ordered_mock_handlers(opener, meth_spec):
375    """Create MockHandlers and add them to an OpenerDirector.
376
377    meth_spec: list of lists of tuples and strings defining methods to define
378    on handlers.  eg:
379
380    [["http_error", "ftp_open"], ["http_open"]]
381
382    defines methods .http_error() and .ftp_open() on one handler, and
383    .http_open() on another.  These methods just record their arguments and
384    return None.  Using a tuple instead of a string causes the method to
385    perform some action (see MockHandler.handle()), eg:
386
387    [["http_error"], [("http_open", "return request")]]
388
389    defines .http_error() on one handler (which simply returns None), and
390    .http_open() on another handler, which returns a Request object.
391
392    """
393    handlers = []
394    count = 0
395    for meths in meth_spec:
396        class MockHandlerSubclass(MockHandler): pass
397        h = MockHandlerSubclass(meths)
398        h.handler_order += count
399        h.add_parent(opener)
400        count = count + 1
401        handlers.append(h)
402        opener.add_handler(h)
403    return handlers
404
405def build_test_opener(*handler_instances):
406    opener = OpenerDirector()
407    for h in handler_instances:
408        opener.add_handler(h)
409    return opener
410
411class MockHTTPHandler(urllib2.BaseHandler):
412    # useful for testing redirections and auth
413    # sends supplied headers and code as first response
414    # sends 200 OK as second response
415    def __init__(self, code, headers):
416        self.code = code
417        self.headers = headers
418        self.reset()
419    def reset(self):
420        self._count = 0
421        self.requests = []
422    def http_open(self, req):
423        import mimetools, copy
424        from StringIO import StringIO
425        self.requests.append(copy.deepcopy(req))
426        if self._count == 0:
427            self._count = self._count + 1
428            name = httplib.responses[self.code]
429            msg = mimetools.Message(StringIO(self.headers))
430            return self.parent.error(
431                "http", req, MockFile(), self.code, name, msg)
432        else:
433            self.req = req
434            msg = mimetools.Message(StringIO("\r\n\r\n"))
435            return MockResponse(200, "OK", msg, "", req.get_full_url())
436
437class MockHTTPSHandler(urllib2.AbstractHTTPHandler):
438    # Useful for testing the Proxy-Authorization request by verifying the
439    # properties of httpcon
440
441    def __init__(self):
442        urllib2.AbstractHTTPHandler.__init__(self)
443        self.httpconn = MockHTTPClass()
444
445    def https_open(self, req):
446        return self.do_open(self.httpconn, req)
447
448class MockPasswordManager:
449    def add_password(self, realm, uri, user, password):
450        self.realm = realm
451        self.url = uri
452        self.user = user
453        self.password = password
454    def find_user_password(self, realm, authuri):
455        self.target_realm = realm
456        self.target_url = authuri
457        return self.user, self.password
458
459
460class OpenerDirectorTests(unittest.TestCase):
461
462    def test_add_non_handler(self):
463        class NonHandler(object):
464            pass
465        self.assertRaises(TypeError,
466                          OpenerDirector().add_handler, NonHandler())
467
468    def test_badly_named_methods(self):
469        # test work-around for three methods that accidentally follow the
470        # naming conventions for handler methods
471        # (*_open() / *_request() / *_response())
472
473        # These used to call the accidentally-named methods, causing a
474        # TypeError in real code; here, returning self from these mock
475        # methods would either cause no exception, or AttributeError.
476
477        from urllib2 import URLError
478
479        o = OpenerDirector()
480        meth_spec = [
481            [("do_open", "return self"), ("proxy_open", "return self")],
482            [("redirect_request", "return self")],
483            ]
484        handlers = add_ordered_mock_handlers(o, meth_spec)
485        o.add_handler(urllib2.UnknownHandler())
486        for scheme in "do", "proxy", "redirect":
487            self.assertRaises(URLError, o.open, scheme+"://example.com/")
488
489    def test_handled(self):
490        # handler returning non-None means no more handlers will be called
491        o = OpenerDirector()
492        meth_spec = [
493            ["http_open", "ftp_open", "http_error_302"],
494            ["ftp_open"],
495            [("http_open", "return self")],
496            [("http_open", "return self")],
497            ]
498        handlers = add_ordered_mock_handlers(o, meth_spec)
499
500        req = Request("http://example.com/")
501        r = o.open(req)
502        # Second .http_open() gets called, third doesn't, since second returned
503        # non-None.  Handlers without .http_open() never get any methods called
504        # on them.
505        # In fact, second mock handler defining .http_open() returns self
506        # (instead of response), which becomes the OpenerDirector's return
507        # value.
508        self.assertEqual(r, handlers[2])
509        calls = [(handlers[0], "http_open"), (handlers[2], "http_open")]
510        for expected, got in zip(calls, o.calls):
511            handler, name, args, kwds = got
512            self.assertEqual((handler, name), expected)
513            self.assertEqual(args, (req,))
514
515    def test_handler_order(self):
516        o = OpenerDirector()
517        handlers = []
518        for meths, handler_order in [
519            ([("http_open", "return self")], 500),
520            (["http_open"], 0),
521            ]:
522            class MockHandlerSubclass(MockHandler): pass
523            h = MockHandlerSubclass(meths)
524            h.handler_order = handler_order
525            handlers.append(h)
526            o.add_handler(h)
527
528        r = o.open("http://example.com/")
529        # handlers called in reverse order, thanks to their sort order
530        self.assertEqual(o.calls[0][0], handlers[1])
531        self.assertEqual(o.calls[1][0], handlers[0])
532
533    def test_raise(self):
534        # raising URLError stops processing of request
535        o = OpenerDirector()
536        meth_spec = [
537            [("http_open", "raise")],
538            [("http_open", "return self")],
539            ]
540        handlers = add_ordered_mock_handlers(o, meth_spec)
541
542        req = Request("http://example.com/")
543        self.assertRaises(urllib2.URLError, o.open, req)
544        self.assertEqual(o.calls, [(handlers[0], "http_open", (req,), {})])
545
546##     def test_error(self):
547##         # XXX this doesn't actually seem to be used in standard library,
548##         #  but should really be tested anyway...
549
550    def test_http_error(self):
551        # XXX http_error_default
552        # http errors are a special case
553        o = OpenerDirector()
554        meth_spec = [
555            [("http_open", "error 302")],
556            [("http_error_400", "raise"), "http_open"],
557            [("http_error_302", "return response"), "http_error_303",
558             "http_error"],
559            [("http_error_302")],
560            ]
561        handlers = add_ordered_mock_handlers(o, meth_spec)
562
563        class Unknown:
564            def __eq__(self, other): return True
565
566        req = Request("http://example.com/")
567        r = o.open(req)
568        assert len(o.calls) == 2
569        calls = [(handlers[0], "http_open", (req,)),
570                 (handlers[2], "http_error_302",
571                  (req, Unknown(), 302, "", {}))]
572        for expected, got in zip(calls, o.calls):
573            handler, method_name, args = expected
574            self.assertEqual((handler, method_name), got[:2])
575            self.assertEqual(args, got[2])
576
577    def test_processors(self):
578        # *_request / *_response methods get called appropriately
579        o = OpenerDirector()
580        meth_spec = [
581            [("http_request", "return request"),
582             ("http_response", "return response")],
583            [("http_request", "return request"),
584             ("http_response", "return response")],
585            ]
586        handlers = add_ordered_mock_handlers(o, meth_spec)
587
588        req = Request("http://example.com/")
589        r = o.open(req)
590        # processor methods are called on *all* handlers that define them,
591        # not just the first handler that handles the request
592        calls = [
593            (handlers[0], "http_request"), (handlers[1], "http_request"),
594            (handlers[0], "http_response"), (handlers[1], "http_response")]
595
596        for i, (handler, name, args, kwds) in enumerate(o.calls):
597            if i < 2:
598                # *_request
599                self.assertEqual((handler, name), calls[i])
600                self.assertEqual(len(args), 1)
601                self.assertIsInstance(args[0], Request)
602            else:
603                # *_response
604                self.assertEqual((handler, name), calls[i])
605                self.assertEqual(len(args), 2)
606                self.assertIsInstance(args[0], Request)
607                # response from opener.open is None, because there's no
608                # handler that defines http_open to handle it
609                if args[1] is not None:
610                    self.assertIsInstance(args[1], MockResponse)
611
612
613def sanepathname2url(path):
614    import urllib
615    urlpath = urllib.pathname2url(path)
616    if os.name == "nt" and urlpath.startswith("///"):
617        urlpath = urlpath[2:]
618    # XXX don't ask me about the mac...
619    return urlpath
620
621class HandlerTests(unittest.TestCase):
622
623    def test_ftp(self):
624        class MockFTPWrapper:
625            def __init__(self, data): self.data = data
626            def retrfile(self, filename, filetype):
627                self.filename, self.filetype = filename, filetype
628                return StringIO.StringIO(self.data), len(self.data)
629            def close(self): pass
630
631        class NullFTPHandler(urllib2.FTPHandler):
632            def __init__(self, data): self.data = data
633            def connect_ftp(self, user, passwd, host, port, dirs,
634                            timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
635                self.user, self.passwd = user, passwd
636                self.host, self.port = host, port
637                self.dirs = dirs
638                self.ftpwrapper = MockFTPWrapper(self.data)
639                return self.ftpwrapper
640
641        import ftplib
642        data = "rheum rhaponicum"
643        h = NullFTPHandler(data)
644        o = h.parent = MockOpener()
645
646        for url, host, port, user, passwd, type_, dirs, filename, mimetype in [
647            ("ftp://localhost/foo/bar/baz.html",
648             "localhost", ftplib.FTP_PORT, "", "", "I",
649             ["foo", "bar"], "baz.html", "text/html"),
650            ("ftp://parrot@localhost/foo/bar/baz.html",
651             "localhost", ftplib.FTP_PORT, "parrot", "", "I",
652             ["foo", "bar"], "baz.html", "text/html"),
653            ("ftp://%25parrot@localhost/foo/bar/baz.html",
654             "localhost", ftplib.FTP_PORT, "%parrot", "", "I",
655             ["foo", "bar"], "baz.html", "text/html"),
656            ("ftp://%2542parrot@localhost/foo/bar/baz.html",
657             "localhost", ftplib.FTP_PORT, "%42parrot", "", "I",
658             ["foo", "bar"], "baz.html", "text/html"),
659            ("ftp://localhost:80/foo/bar/",
660             "localhost", 80, "", "", "D",
661             ["foo", "bar"], "", None),
662            ("ftp://localhost/baz.gif;type=a",
663             "localhost", ftplib.FTP_PORT, "", "", "A",
664             [], "baz.gif", None),  # XXX really this should guess image/gif
665            ]:
666            req = Request(url)
667            req.timeout = None
668            r = h.ftp_open(req)
669            # ftp authentication not yet implemented by FTPHandler
670            self.assertEqual(h.user, user)
671            self.assertEqual(h.passwd, passwd)
672            self.assertEqual(h.host, socket.gethostbyname(host))
673            self.assertEqual(h.port, port)
674            self.assertEqual(h.dirs, dirs)
675            self.assertEqual(h.ftpwrapper.filename, filename)
676            self.assertEqual(h.ftpwrapper.filetype, type_)
677            headers = r.info()
678            self.assertEqual(headers.get("Content-type"), mimetype)
679            self.assertEqual(int(headers["Content-length"]), len(data))
680
681    def test_file(self):
682        import rfc822, socket
683        h = urllib2.FileHandler()
684        o = h.parent = MockOpener()
685
686        TESTFN = test_support.TESTFN
687        urlpath = sanepathname2url(os.path.abspath(TESTFN))
688        towrite = "hello, world\n"
689        urls = [
690            "file://localhost%s" % urlpath,
691            "file://%s" % urlpath,
692            "file://%s%s" % (socket.gethostbyname('localhost'), urlpath),
693            ]
694        try:
695            localaddr = socket.gethostbyname(socket.gethostname())
696        except socket.gaierror:
697            localaddr = ''
698        if localaddr:
699            urls.append("file://%s%s" % (localaddr, urlpath))
700
701        for url in urls:
702            f = open(TESTFN, "wb")
703            try:
704                try:
705                    f.write(towrite)
706                finally:
707                    f.close()
708
709                r = h.file_open(Request(url))
710                try:
711                    data = r.read()
712                    headers = r.info()
713                    respurl = r.geturl()
714                finally:
715                    r.close()
716                stats = os.stat(TESTFN)
717                modified = rfc822.formatdate(stats.st_mtime)
718            finally:
719                os.remove(TESTFN)
720            self.assertEqual(data, towrite)
721            self.assertEqual(headers["Content-type"], "text/plain")
722            self.assertEqual(headers["Content-length"], "13")
723            self.assertEqual(headers["Last-modified"], modified)
724            self.assertEqual(respurl, url)
725
726        for url in [
727            "file://localhost:80%s" % urlpath,
728            "file:///file_does_not_exist.txt",
729            "file://%s:80%s/%s" % (socket.gethostbyname('localhost'),
730                                   os.getcwd(), TESTFN),
731            "file://somerandomhost.ontheinternet.com%s/%s" %
732            (os.getcwd(), TESTFN),
733            ]:
734            try:
735                f = open(TESTFN, "wb")
736                try:
737                    f.write(towrite)
738                finally:
739                    f.close()
740
741                self.assertRaises(urllib2.URLError,
742                                  h.file_open, Request(url))
743            finally:
744                os.remove(TESTFN)
745
746        h = urllib2.FileHandler()
747        o = h.parent = MockOpener()
748        # XXXX why does // mean ftp (and /// mean not ftp!), and where
749        #  is file: scheme specified?  I think this is really a bug, and
750        #  what was intended was to distinguish between URLs like:
751        # file:/blah.txt (a file)
752        # file://localhost/blah.txt (a file)
753        # file:///blah.txt (a file)
754        # file://ftp.example.com/blah.txt (an ftp URL)
755        for url, ftp in [
756            ("file://ftp.example.com//foo.txt", True),
757            ("file://ftp.example.com///foo.txt", False),
758# XXXX bug: fails with OSError, should be URLError
759            ("file://ftp.example.com/foo.txt", False),
760            ("file://somehost//foo/something.txt", True),
761            ("file://localhost//foo/something.txt", False),
762            ]:
763            req = Request(url)
764            try:
765                h.file_open(req)
766            # XXXX remove OSError when bug fixed
767            except (urllib2.URLError, OSError):
768                self.assertTrue(not ftp)
769            else:
770                self.assertTrue(o.req is req)
771                self.assertEqual(req.type, "ftp")
772            self.assertEqual(req.type == "ftp", ftp)
773
774    def test_http(self):
775
776        h = urllib2.AbstractHTTPHandler()
777        o = h.parent = MockOpener()
778
779        url = "http://example.com/"
780        for method, data in [("GET", None), ("POST", "blah")]:
781            req = Request(url, data, {"Foo": "bar"})
782            req.timeout = None
783            req.add_unredirected_header("Spam", "eggs")
784            http = MockHTTPClass()
785            r = h.do_open(http, req)
786
787            # result attributes
788            r.read; r.readline  # wrapped MockFile methods
789            r.info; r.geturl  # addinfourl methods
790            r.code, r.msg == 200, "OK"  # added from MockHTTPClass.getreply()
791            hdrs = r.info()
792            hdrs.get; hdrs.has_key  # r.info() gives dict from .getreply()
793            self.assertEqual(r.geturl(), url)
794
795            self.assertEqual(http.host, "example.com")
796            self.assertEqual(http.level, 0)
797            self.assertEqual(http.method, method)
798            self.assertEqual(http.selector, "/")
799            self.assertEqual(http.req_headers,
800                             [("Connection", "close"),
801                              ("Foo", "bar"), ("Spam", "eggs")])
802            self.assertEqual(http.data, data)
803
804        # check socket.error converted to URLError
805        http.raise_on_endheaders = True
806        self.assertRaises(urllib2.URLError, h.do_open, http, req)
807
808        # check adding of standard headers
809        o.addheaders = [("Spam", "eggs")]
810        for data in "", None:  # POST, GET
811            req = Request("http://example.com/", data)
812            r = MockResponse(200, "OK", {}, "")
813            newreq = h.do_request_(req)
814            if data is None:  # GET
815                self.assertNotIn("Content-length", req.unredirected_hdrs)
816                self.assertNotIn("Content-type", req.unredirected_hdrs)
817            else:  # POST
818                self.assertEqual(req.unredirected_hdrs["Content-length"], "0")
819                self.assertEqual(req.unredirected_hdrs["Content-type"],
820                             "application/x-www-form-urlencoded")
821            # XXX the details of Host could be better tested
822            self.assertEqual(req.unredirected_hdrs["Host"], "example.com")
823            self.assertEqual(req.unredirected_hdrs["Spam"], "eggs")
824
825            # don't clobber existing headers
826            req.add_unredirected_header("Content-length", "foo")
827            req.add_unredirected_header("Content-type", "bar")
828            req.add_unredirected_header("Host", "baz")
829            req.add_unredirected_header("Spam", "foo")
830            newreq = h.do_request_(req)
831            self.assertEqual(req.unredirected_hdrs["Content-length"], "foo")
832            self.assertEqual(req.unredirected_hdrs["Content-type"], "bar")
833            self.assertEqual(req.unredirected_hdrs["Host"], "baz")
834            self.assertEqual(req.unredirected_hdrs["Spam"], "foo")
835
836    def test_http_doubleslash(self):
837        # Checks that the presence of an unnecessary double slash in a url doesn't break anything
838        # Previously, a double slash directly after the host could cause incorrect parsing of the url
839        h = urllib2.AbstractHTTPHandler()
840        o = h.parent = MockOpener()
841
842        data = ""
843        ds_urls = [
844            "http://example.com/foo/bar/baz.html",
845            "http://example.com//foo/bar/baz.html",
846            "http://example.com/foo//bar/baz.html",
847            "http://example.com/foo/bar//baz.html",
848        ]
849
850        for ds_url in ds_urls:
851            ds_req = Request(ds_url, data)
852
853            # Check whether host is determined correctly if there is no proxy
854            np_ds_req = h.do_request_(ds_req)
855            self.assertEqual(np_ds_req.unredirected_hdrs["Host"],"example.com")
856
857            # Check whether host is determined correctly if there is a proxy
858            ds_req.set_proxy("someproxy:3128",None)
859            p_ds_req = h.do_request_(ds_req)
860            self.assertEqual(p_ds_req.unredirected_hdrs["Host"],"example.com")
861
862    def test_fixpath_in_weirdurls(self):
863        # Issue4493: urllib2 to supply '/' when to urls where path does not
864        # start with'/'
865
866        h = urllib2.AbstractHTTPHandler()
867        o = h.parent = MockOpener()
868
869        weird_url = 'http://www.python.org?getspam'
870        req = Request(weird_url)
871        newreq = h.do_request_(req)
872        self.assertEqual(newreq.get_host(),'www.python.org')
873        self.assertEqual(newreq.get_selector(),'/?getspam')
874
875        url_without_path = 'http://www.python.org'
876        req = Request(url_without_path)
877        newreq = h.do_request_(req)
878        self.assertEqual(newreq.get_host(),'www.python.org')
879        self.assertEqual(newreq.get_selector(),'')
880
881    def test_errors(self):
882        h = urllib2.HTTPErrorProcessor()
883        o = h.parent = MockOpener()
884
885        url = "http://example.com/"
886        req = Request(url)
887        # all 2xx are passed through
888        r = MockResponse(200, "OK", {}, "", url)
889        newr = h.http_response(req, r)
890        self.assertTrue(r is newr)
891        self.assertTrue(not hasattr(o, "proto"))  # o.error not called
892        r = MockResponse(202, "Accepted", {}, "", url)
893        newr = h.http_response(req, r)
894        self.assertTrue(r is newr)
895        self.assertTrue(not hasattr(o, "proto"))  # o.error not called
896        r = MockResponse(206, "Partial content", {}, "", url)
897        newr = h.http_response(req, r)
898        self.assertTrue(r is newr)
899        self.assertTrue(not hasattr(o, "proto"))  # o.error not called
900        # anything else calls o.error (and MockOpener returns None, here)
901        r = MockResponse(502, "Bad gateway", {}, "", url)
902        self.assertTrue(h.http_response(req, r) is None)
903        self.assertEqual(o.proto, "http")  # o.error called
904        self.assertEqual(o.args, (req, r, 502, "Bad gateway", {}))
905
906    def test_cookies(self):
907        cj = MockCookieJar()
908        h = urllib2.HTTPCookieProcessor(cj)
909        o = h.parent = MockOpener()
910
911        req = Request("http://example.com/")
912        r = MockResponse(200, "OK", {}, "")
913        newreq = h.http_request(req)
914        self.assertTrue(cj.ach_req is req is newreq)
915        self.assertEqual(req.get_origin_req_host(), "example.com")
916        self.assertTrue(not req.is_unverifiable())
917        newr = h.http_response(req, r)
918        self.assertTrue(cj.ec_req is req)
919        self.assertTrue(cj.ec_r is r is newr)
920
921    def test_redirect(self):
922        from_url = "http://example.com/a.html"
923        to_url = "http://example.com/b.html"
924        h = urllib2.HTTPRedirectHandler()
925        o = h.parent = MockOpener()
926
927        # ordinary redirect behaviour
928        for code in 301, 302, 303, 307:
929            for data in None, "blah\nblah\n":
930                method = getattr(h, "http_error_%s" % code)
931                req = Request(from_url, data)
932                req.add_header("Nonsense", "viking=withhold")
933                req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
934                if data is not None:
935                    req.add_header("Content-Length", str(len(data)))
936                req.add_unredirected_header("Spam", "spam")
937                try:
938                    method(req, MockFile(), code, "Blah",
939                           MockHeaders({"location": to_url}))
940                except urllib2.HTTPError:
941                    # 307 in response to POST requires user OK
942                    self.assertEqual(code, 307)
943                    self.assertIsNotNone(data)
944                self.assertEqual(o.req.get_full_url(), to_url)
945                try:
946                    self.assertEqual(o.req.get_method(), "GET")
947                except AttributeError:
948                    self.assertTrue(not o.req.has_data())
949
950                # now it's a GET, there should not be headers regarding content
951                # (possibly dragged from before being a POST)
952                headers = [x.lower() for x in o.req.headers]
953                self.assertNotIn("content-length", headers)
954                self.assertNotIn("content-type", headers)
955
956                self.assertEqual(o.req.headers["Nonsense"],
957                                 "viking=withhold")
958                self.assertNotIn("Spam", o.req.headers)
959                self.assertNotIn("Spam", o.req.unredirected_hdrs)
960
961        # loop detection
962        req = Request(from_url)
963        req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
964        def redirect(h, req, url=to_url):
965            h.http_error_302(req, MockFile(), 302, "Blah",
966                             MockHeaders({"location": url}))
967        # Note that the *original* request shares the same record of
968        # redirections with the sub-requests caused by the redirections.
969
970        # detect infinite loop redirect of a URL to itself
971        req = Request(from_url, origin_req_host="example.com")
972        count = 0
973        req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
974        try:
975            while 1:
976                redirect(h, req, "http://example.com/")
977                count = count + 1
978        except urllib2.HTTPError:
979            # don't stop until max_repeats, because cookies may introduce state
980            self.assertEqual(count, urllib2.HTTPRedirectHandler.max_repeats)
981
982        # detect endless non-repeating chain of redirects
983        req = Request(from_url, origin_req_host="example.com")
984        count = 0
985        req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
986        try:
987            while 1:
988                redirect(h, req, "http://example.com/%d" % count)
989                count = count + 1
990        except urllib2.HTTPError:
991            self.assertEqual(count,
992                             urllib2.HTTPRedirectHandler.max_redirections)
993
994    def test_invalid_redirect(self):
995        from_url = "http://example.com/a.html"
996        valid_schemes = ['http', 'https', 'ftp']
997        invalid_schemes = ['file', 'imap', 'ldap']
998        schemeless_url = "example.com/b.html"
999        h = urllib2.HTTPRedirectHandler()
1000        o = h.parent = MockOpener()
1001        req = Request(from_url)
1002        req.timeout = socket._GLOBAL_DEFAULT_TIMEOUT
1003
1004        for scheme in invalid_schemes:
1005            invalid_url = scheme + '://' + schemeless_url
1006            self.assertRaises(urllib2.HTTPError, h.http_error_302,
1007                              req, MockFile(), 302, "Security Loophole",
1008                              MockHeaders({"location": invalid_url}))
1009
1010        for scheme in valid_schemes:
1011            valid_url = scheme + '://' + schemeless_url
1012            h.http_error_302(req, MockFile(), 302, "That's fine",
1013                MockHeaders({"location": valid_url}))
1014            self.assertEqual(o.req.get_full_url(), valid_url)
1015
1016    def test_cookie_redirect(self):
1017        # cookies shouldn't leak into redirected requests
1018        from cookielib import CookieJar
1019
1020        from test.test_cookielib import interact_netscape
1021
1022        cj = CookieJar()
1023        interact_netscape(cj, "http://www.example.com/", "spam=eggs")
1024        hh = MockHTTPHandler(302, "Location: http://www.cracker.com/\r\n\r\n")
1025        hdeh = urllib2.HTTPDefaultErrorHandler()
1026        hrh = urllib2.HTTPRedirectHandler()
1027        cp = urllib2.HTTPCookieProcessor(cj)
1028        o = build_test_opener(hh, hdeh, hrh, cp)
1029        o.open("http://www.example.com/")
1030        self.assertTrue(not hh.req.has_header("Cookie"))
1031
1032    def test_redirect_fragment(self):
1033        redirected_url = 'http://www.example.com/index.html#OK\r\n\r\n'
1034        hh = MockHTTPHandler(302, 'Location: ' + redirected_url)
1035        hdeh = urllib2.HTTPDefaultErrorHandler()
1036        hrh = urllib2.HTTPRedirectHandler()
1037        o = build_test_opener(hh, hdeh, hrh)
1038        fp = o.open('http://www.example.com')
1039        self.assertEqual(fp.geturl(), redirected_url.strip())
1040
1041    def test_redirect_no_path(self):
1042        # Issue 14132: Relative redirect strips original path
1043        real_class = httplib.HTTPConnection
1044        response1 = b"HTTP/1.1 302 Found\r\nLocation: ?query\r\n\r\n"
1045        httplib.HTTPConnection = test_urllib.fakehttp(response1)
1046        self.addCleanup(setattr, httplib, "HTTPConnection", real_class)
1047        urls = iter(("/path", "/path?query"))
1048        def request(conn, method, url, *pos, **kw):
1049            self.assertEqual(url, next(urls))
1050            real_class.request(conn, method, url, *pos, **kw)
1051            # Change response for subsequent connection
1052            conn.__class__.fakedata = b"HTTP/1.1 200 OK\r\n\r\nHello!"
1053        httplib.HTTPConnection.request = request
1054        fp = urllib2.urlopen("http://python.org/path")
1055        self.assertEqual(fp.geturl(), "http://python.org/path?query")
1056
1057    def test_proxy(self):
1058        o = OpenerDirector()
1059        ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
1060        o.add_handler(ph)
1061        meth_spec = [
1062            [("http_open", "return response")]
1063            ]
1064        handlers = add_ordered_mock_handlers(o, meth_spec)
1065
1066        req = Request("http://acme.example.com/")
1067        self.assertEqual(req.get_host(), "acme.example.com")
1068        r = o.open(req)
1069        self.assertEqual(req.get_host(), "proxy.example.com:3128")
1070
1071        self.assertEqual([(handlers[0], "http_open")],
1072                         [tup[0:2] for tup in o.calls])
1073
1074    def test_proxy_no_proxy(self):
1075        os.environ['no_proxy'] = 'python.org'
1076        o = OpenerDirector()
1077        ph = urllib2.ProxyHandler(dict(http="proxy.example.com"))
1078        o.add_handler(ph)
1079        req = Request("http://www.perl.org/")
1080        self.assertEqual(req.get_host(), "www.perl.org")
1081        r = o.open(req)
1082        self.assertEqual(req.get_host(), "proxy.example.com")
1083        req = Request("http://www.python.org")
1084        self.assertEqual(req.get_host(), "www.python.org")
1085        r = o.open(req)
1086        self.assertEqual(req.get_host(), "www.python.org")
1087        del os.environ['no_proxy']
1088
1089
1090    def test_proxy_https(self):
1091        o = OpenerDirector()
1092        ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))
1093        o.add_handler(ph)
1094        meth_spec = [
1095            [("https_open","return response")]
1096        ]
1097        handlers = add_ordered_mock_handlers(o, meth_spec)
1098        req = Request("https://www.example.com/")
1099        self.assertEqual(req.get_host(), "www.example.com")
1100        r = o.open(req)
1101        self.assertEqual(req.get_host(), "proxy.example.com:3128")
1102        self.assertEqual([(handlers[0], "https_open")],
1103                         [tup[0:2] for tup in o.calls])
1104
1105    def test_proxy_https_proxy_authorization(self):
1106        o = OpenerDirector()
1107        ph = urllib2.ProxyHandler(dict(https='proxy.example.com:3128'))
1108        o.add_handler(ph)
1109        https_handler = MockHTTPSHandler()
1110        o.add_handler(https_handler)
1111        req = Request("https://www.example.com/")
1112        req.add_header("Proxy-Authorization","FooBar")
1113        req.add_header("User-Agent","Grail")
1114        self.assertEqual(req.get_host(), "www.example.com")
1115        self.assertIsNone(req._tunnel_host)
1116        r = o.open(req)
1117        # Verify Proxy-Authorization gets tunneled to request.
1118        # httpsconn req_headers do not have the Proxy-Authorization header but
1119        # the req will have.
1120        self.assertNotIn(("Proxy-Authorization","FooBar"),
1121                         https_handler.httpconn.req_headers)
1122        self.assertIn(("User-Agent","Grail"),
1123                      https_handler.httpconn.req_headers)
1124        self.assertIsNotNone(req._tunnel_host)
1125        self.assertEqual(req.get_host(), "proxy.example.com:3128")
1126        self.assertEqual(req.get_header("Proxy-authorization"),"FooBar")
1127
1128    def test_basic_auth(self, quote_char='"'):
1129        opener = OpenerDirector()
1130        password_manager = MockPasswordManager()
1131        auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
1132        realm = "ACME Widget Store"
1133        http_handler = MockHTTPHandler(
1134            401, 'WWW-Authenticate: Basic realm=%s%s%s\r\n\r\n' %
1135            (quote_char, realm, quote_char) )
1136        opener.add_handler(auth_handler)
1137        opener.add_handler(http_handler)
1138        self._test_basic_auth(opener, auth_handler, "Authorization",
1139                              realm, http_handler, password_manager,
1140                              "http://acme.example.com/protected",
1141                              "http://acme.example.com/protected"
1142                             )
1143
1144    def test_basic_auth_with_single_quoted_realm(self):
1145        self.test_basic_auth(quote_char="'")
1146
1147    def test_basic_auth_with_unquoted_realm(self):
1148        opener = OpenerDirector()
1149        password_manager = MockPasswordManager()
1150        auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
1151        realm = "ACME Widget Store"
1152        http_handler = MockHTTPHandler(
1153            401, 'WWW-Authenticate: Basic realm=%s\r\n\r\n' % realm)
1154        opener.add_handler(auth_handler)
1155        opener.add_handler(http_handler)
1156        msg = "Basic Auth Realm was unquoted"
1157        with test_support.check_warnings((msg, UserWarning)):
1158            self._test_basic_auth(opener, auth_handler, "Authorization",
1159                                  realm, http_handler, password_manager,
1160                                  "http://acme.example.com/protected",
1161                                  "http://acme.example.com/protected"
1162                                 )
1163
1164
1165    def test_proxy_basic_auth(self):
1166        opener = OpenerDirector()
1167        ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
1168        opener.add_handler(ph)
1169        password_manager = MockPasswordManager()
1170        auth_handler = urllib2.ProxyBasicAuthHandler(password_manager)
1171        realm = "ACME Networks"
1172        http_handler = MockHTTPHandler(
1173            407, 'Proxy-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
1174        opener.add_handler(auth_handler)
1175        opener.add_handler(http_handler)
1176        self._test_basic_auth(opener, auth_handler, "Proxy-authorization",
1177                              realm, http_handler, password_manager,
1178                              "http://acme.example.com:3128/protected",
1179                              "proxy.example.com:3128",
1180                              )
1181
1182    def test_basic_and_digest_auth_handlers(self):
1183        # HTTPDigestAuthHandler raised an exception if it couldn't handle a 40*
1184        # response (http://python.org/sf/1479302), where it should instead
1185        # return None to allow another handler (especially
1186        # HTTPBasicAuthHandler) to handle the response.
1187
1188        # Also (http://python.org/sf/14797027, RFC 2617 section 1.2), we must
1189        # try digest first (since it's the strongest auth scheme), so we record
1190        # order of calls here to check digest comes first:
1191        class RecordingOpenerDirector(OpenerDirector):
1192            def __init__(self):
1193                OpenerDirector.__init__(self)
1194                self.recorded = []
1195            def record(self, info):
1196                self.recorded.append(info)
1197        class TestDigestAuthHandler(urllib2.HTTPDigestAuthHandler):
1198            def http_error_401(self, *args, **kwds):
1199                self.parent.record("digest")
1200                urllib2.HTTPDigestAuthHandler.http_error_401(self,
1201                                                             *args, **kwds)
1202        class TestBasicAuthHandler(urllib2.HTTPBasicAuthHandler):
1203            def http_error_401(self, *args, **kwds):
1204                self.parent.record("basic")
1205                urllib2.HTTPBasicAuthHandler.http_error_401(self,
1206                                                            *args, **kwds)
1207
1208        opener = RecordingOpenerDirector()
1209        password_manager = MockPasswordManager()
1210        digest_handler = TestDigestAuthHandler(password_manager)
1211        basic_handler = TestBasicAuthHandler(password_manager)
1212        realm = "ACME Networks"
1213        http_handler = MockHTTPHandler(
1214            401, 'WWW-Authenticate: Basic realm="%s"\r\n\r\n' % realm)
1215        opener.add_handler(basic_handler)
1216        opener.add_handler(digest_handler)
1217        opener.add_handler(http_handler)
1218
1219        # check basic auth isn't blocked by digest handler failing
1220        self._test_basic_auth(opener, basic_handler, "Authorization",
1221                              realm, http_handler, password_manager,
1222                              "http://acme.example.com/protected",
1223                              "http://acme.example.com/protected",
1224                              )
1225        # check digest was tried before basic (twice, because
1226        # _test_basic_auth called .open() twice)
1227        self.assertEqual(opener.recorded, ["digest", "basic"]*2)
1228
1229    def _test_basic_auth(self, opener, auth_handler, auth_header,
1230                         realm, http_handler, password_manager,
1231                         request_url, protected_url):
1232        import base64
1233        user, password = "wile", "coyote"
1234
1235        # .add_password() fed through to password manager
1236        auth_handler.add_password(realm, request_url, user, password)
1237        self.assertEqual(realm, password_manager.realm)
1238        self.assertEqual(request_url, password_manager.url)
1239        self.assertEqual(user, password_manager.user)
1240        self.assertEqual(password, password_manager.password)
1241
1242        r = opener.open(request_url)
1243
1244        # should have asked the password manager for the username/password
1245        self.assertEqual(password_manager.target_realm, realm)
1246        self.assertEqual(password_manager.target_url, protected_url)
1247
1248        # expect one request without authorization, then one with
1249        self.assertEqual(len(http_handler.requests), 2)
1250        self.assertFalse(http_handler.requests[0].has_header(auth_header))
1251        userpass = '%s:%s' % (user, password)
1252        auth_hdr_value = 'Basic '+base64.encodestring(userpass).strip()
1253        self.assertEqual(http_handler.requests[1].get_header(auth_header),
1254                         auth_hdr_value)
1255        self.assertEqual(http_handler.requests[1].unredirected_hdrs[auth_header],
1256                         auth_hdr_value)
1257        # if the password manager can't find a password, the handler won't
1258        # handle the HTTP auth error
1259        password_manager.user = password_manager.password = None
1260        http_handler.reset()
1261        r = opener.open(request_url)
1262        self.assertEqual(len(http_handler.requests), 1)
1263        self.assertFalse(http_handler.requests[0].has_header(auth_header))
1264
1265class MiscTests(unittest.TestCase):
1266
1267    def test_build_opener(self):
1268        class MyHTTPHandler(urllib2.HTTPHandler): pass
1269        class FooHandler(urllib2.BaseHandler):
1270            def foo_open(self): pass
1271        class BarHandler(urllib2.BaseHandler):
1272            def bar_open(self): pass
1273
1274        build_opener = urllib2.build_opener
1275
1276        o = build_opener(FooHandler, BarHandler)
1277        self.opener_has_handler(o, FooHandler)
1278        self.opener_has_handler(o, BarHandler)
1279
1280        # can take a mix of classes and instances
1281        o = build_opener(FooHandler, BarHandler())
1282        self.opener_has_handler(o, FooHandler)
1283        self.opener_has_handler(o, BarHandler)
1284
1285        # subclasses of default handlers override default handlers
1286        o = build_opener(MyHTTPHandler)
1287        self.opener_has_handler(o, MyHTTPHandler)
1288
1289        # a particular case of overriding: default handlers can be passed
1290        # in explicitly
1291        o = build_opener()
1292        self.opener_has_handler(o, urllib2.HTTPHandler)
1293        o = build_opener(urllib2.HTTPHandler)
1294        self.opener_has_handler(o, urllib2.HTTPHandler)
1295        o = build_opener(urllib2.HTTPHandler())
1296        self.opener_has_handler(o, urllib2.HTTPHandler)
1297
1298        # Issue2670: multiple handlers sharing the same base class
1299        class MyOtherHTTPHandler(urllib2.HTTPHandler): pass
1300        o = build_opener(MyHTTPHandler, MyOtherHTTPHandler)
1301        self.opener_has_handler(o, MyHTTPHandler)
1302        self.opener_has_handler(o, MyOtherHTTPHandler)
1303
1304    def opener_has_handler(self, opener, handler_class):
1305        for h in opener.handlers:
1306            if h.__class__ == handler_class:
1307                break
1308        else:
1309            self.assertTrue(False)
1310
1311    def test_unsupported_algorithm(self):
1312        handler = AbstractDigestAuthHandler()
1313        with self.assertRaises(ValueError) as exc:
1314            handler.get_algorithm_impls('invalid')
1315        self.assertEqual(
1316            str(exc.exception),
1317            "Unsupported digest authentication algorithm 'invalid'"
1318        )
1319
1320
1321class RequestTests(unittest.TestCase):
1322
1323    def setUp(self):
1324        self.get = urllib2.Request("http://www.python.org/~jeremy/")
1325        self.post = urllib2.Request("http://www.python.org/~jeremy/",
1326                                    "data",
1327                                    headers={"X-Test": "test"})
1328
1329    def test_method(self):
1330        self.assertEqual("POST", self.post.get_method())
1331        self.assertEqual("GET", self.get.get_method())
1332
1333    def test_add_data(self):
1334        self.assertTrue(not self.get.has_data())
1335        self.assertEqual("GET", self.get.get_method())
1336        self.get.add_data("spam")
1337        self.assertTrue(self.get.has_data())
1338        self.assertEqual("POST", self.get.get_method())
1339
1340    def test_get_full_url(self):
1341        self.assertEqual("http://www.python.org/~jeremy/",
1342                         self.get.get_full_url())
1343
1344    def test_selector(self):
1345        self.assertEqual("/~jeremy/", self.get.get_selector())
1346        req = urllib2.Request("http://www.python.org/")
1347        self.assertEqual("/", req.get_selector())
1348
1349    def test_get_type(self):
1350        self.assertEqual("http", self.get.get_type())
1351
1352    def test_get_host(self):
1353        self.assertEqual("www.python.org", self.get.get_host())
1354
1355    def test_get_host_unquote(self):
1356        req = urllib2.Request("http://www.%70ython.org/")
1357        self.assertEqual("www.python.org", req.get_host())
1358
1359    def test_proxy(self):
1360        self.assertTrue(not self.get.has_proxy())
1361        self.get.set_proxy("www.perl.org", "http")
1362        self.assertTrue(self.get.has_proxy())
1363        self.assertEqual("www.python.org", self.get.get_origin_req_host())
1364        self.assertEqual("www.perl.org", self.get.get_host())
1365
1366    def test_wrapped_url(self):
1367        req = Request("<URL:http://www.python.org>")
1368        self.assertEqual("www.python.org", req.get_host())
1369
1370    def test_url_fragment(self):
1371        req = Request("http://www.python.org/?qs=query#fragment=true")
1372        self.assertEqual("/?qs=query", req.get_selector())
1373        req = Request("http://www.python.org/#fun=true")
1374        self.assertEqual("/", req.get_selector())
1375
1376        # Issue 11703: geturl() omits fragment in the original URL.
1377        url = 'http://docs.python.org/library/urllib2.html#OK'
1378        req = Request(url)
1379        self.assertEqual(req.get_full_url(), url)
1380
1381    def test_private_attributes(self):
1382        self.assertFalse(hasattr(self.get, '_Request__r_xxx'))
1383        # Issue #6500: infinite recursion
1384        self.assertFalse(hasattr(self.get, '_Request__r_method'))
1385
1386    def test_HTTPError_interface(self):
1387        """
1388        Issue 13211 reveals that HTTPError didn't implement the URLError
1389        interface even though HTTPError is a subclass of URLError.
1390
1391        >>> err = urllib2.HTTPError(msg='something bad happened', url=None, code=None, hdrs=None, fp=None)
1392        >>> assert hasattr(err, 'reason')
1393        >>> err.reason
1394        'something bad happened'
1395        """
1396
1397    def test_HTTPError_interface_call(self):
1398        """
1399        Issue 15701= - HTTPError interface has info method available from URLError.
1400        """
1401        err = urllib2.HTTPError(msg='something bad happened', url=None,
1402                                code=None, hdrs='Content-Length:42', fp=None)
1403        self.assertTrue(hasattr(err, 'reason'))
1404        assert hasattr(err, 'reason')
1405        assert hasattr(err, 'info')
1406        assert callable(err.info)
1407        try:
1408            err.info()
1409        except AttributeError:
1410            self.fail("err.info() failed")
1411        self.assertEqual(err.info(), "Content-Length:42")
1412
1413def test_main(verbose=None):
1414    from test import test_urllib2
1415    test_support.run_doctest(test_urllib2, verbose)
1416    test_support.run_doctest(urllib2, verbose)
1417    tests = (TrivialTests,
1418             OpenerDirectorTests,
1419             HandlerTests,
1420             MiscTests,
1421             RequestTests)
1422    test_support.run_unittest(*tests)
1423
1424if __name__ == "__main__":
1425    test_main(verbose=True)
1426