• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import errno
2import unittest
3from test import support
4from test.support import os_helper
5from test.support import socket_helper
6from test.support import ResourceDenied
7from test.test_urllib2 import sanepathname2url
8
9import os
10import socket
11import urllib.error
12import urllib.request
13import sys
14
15support.requires("network")
16
17
18def _retry_thrice(func, exc, *args, **kwargs):
19    for i in range(3):
20        try:
21            return func(*args, **kwargs)
22        except exc as e:
23            last_exc = e
24            continue
25    raise last_exc
26
27def _wrap_with_retry_thrice(func, exc):
28    def wrapped(*args, **kwargs):
29        return _retry_thrice(func, exc, *args, **kwargs)
30    return wrapped
31
32# Connecting to remote hosts is flaky.  Make it more robust by retrying
33# the connection several times.
34_urlopen_with_retry = _wrap_with_retry_thrice(urllib.request.urlopen,
35                                              urllib.error.URLError)
36
37
38class TransientResource(object):
39
40    """Raise ResourceDenied if an exception is raised while the context manager
41    is in effect that matches the specified exception and attributes."""
42
43    def __init__(self, exc, **kwargs):
44        self.exc = exc
45        self.attrs = kwargs
46
47    def __enter__(self):
48        return self
49
50    def __exit__(self, type_=None, value=None, traceback=None):
51        """If type_ is a subclass of self.exc and value has attributes matching
52        self.attrs, raise ResourceDenied.  Otherwise let the exception
53        propagate (if any)."""
54        if type_ is not None and issubclass(self.exc, type_):
55            for attr, attr_value in self.attrs.items():
56                if not hasattr(value, attr):
57                    break
58                if getattr(value, attr) != attr_value:
59                    break
60            else:
61                raise ResourceDenied("an optional resource is not available")
62
63# Context managers that raise ResourceDenied when various issues
64# with the internet connection manifest themselves as exceptions.
65# XXX deprecate these and use transient_internet() instead
66time_out = TransientResource(OSError, errno=errno.ETIMEDOUT)
67socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
68ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET)
69
70
71class AuthTests(unittest.TestCase):
72    """Tests urllib2 authentication features."""
73
74## Disabled at the moment since there is no page under python.org which
75## could be used to HTTP authentication.
76#
77#    def test_basic_auth(self):
78#        import http.client
79#
80#        test_url = "http://www.python.org/test/test_urllib2/basic_auth"
81#        test_hostport = "www.python.org"
82#        test_realm = 'Test Realm'
83#        test_user = 'test.test_urllib2net'
84#        test_password = 'blah'
85#
86#        # failure
87#        try:
88#            _urlopen_with_retry(test_url)
89#        except urllib2.HTTPError, exc:
90#            self.assertEqual(exc.code, 401)
91#        else:
92#            self.fail("urlopen() should have failed with 401")
93#
94#        # success
95#        auth_handler = urllib2.HTTPBasicAuthHandler()
96#        auth_handler.add_password(test_realm, test_hostport,
97#                                  test_user, test_password)
98#        opener = urllib2.build_opener(auth_handler)
99#        f = opener.open('http://localhost/')
100#        response = _urlopen_with_retry("http://www.python.org/")
101#
102#        # The 'userinfo' URL component is deprecated by RFC 3986 for security
103#        # reasons, let's not implement it!  (it's already implemented for proxy
104#        # specification strings (that is, URLs or authorities specifying a
105#        # proxy), so we must keep that)
106#        self.assertRaises(http.client.InvalidURL,
107#                          urllib2.urlopen, "http://evil:thing@example.com")
108
109
110class CloseSocketTest(unittest.TestCase):
111
112    def test_close(self):
113        # clear _opener global variable
114        self.addCleanup(urllib.request.urlcleanup)
115
116        # calling .close() on urllib2's response objects should close the
117        # underlying socket
118        url = support.TEST_HTTP_URL
119        with socket_helper.transient_internet(url):
120            response = _urlopen_with_retry(url)
121            sock = response.fp
122            self.assertFalse(sock.closed)
123            response.close()
124            self.assertTrue(sock.closed)
125
126class OtherNetworkTests(unittest.TestCase):
127    def setUp(self):
128        if 0:  # for debugging
129            import logging
130            logger = logging.getLogger("test_urllib2net")
131            logger.addHandler(logging.StreamHandler())
132
133    # XXX The rest of these tests aren't very good -- they don't check much.
134    # They do sometimes catch some major disasters, though.
135
136    @support.requires_resource('walltime')
137    def test_ftp(self):
138        # Testing the same URL twice exercises the caching in CacheFTPHandler
139        urls = [
140            'ftp://www.pythontest.net/README',
141            'ftp://www.pythontest.net/README',
142            ('ftp://www.pythontest.net/non-existent-file',
143             None, urllib.error.URLError),
144            ]
145        self._test_urls(urls, self._extra_handlers())
146
147    def test_file(self):
148        TESTFN = os_helper.TESTFN
149        f = open(TESTFN, 'w')
150        try:
151            f.write('hi there\n')
152            f.close()
153            urls = [
154                'file:' + sanepathname2url(os.path.abspath(TESTFN)),
155                ('file:///nonsensename/etc/passwd', None,
156                 urllib.error.URLError),
157                ]
158            self._test_urls(urls, self._extra_handlers(), retry=True)
159        finally:
160            os.remove(TESTFN)
161
162        self.assertRaises(ValueError, urllib.request.urlopen,'./relative_path/to/file')
163
164    # XXX Following test depends on machine configurations that are internal
165    # to CNRI.  Need to set up a public server with the right authentication
166    # configuration for test purposes.
167
168##     def test_cnri(self):
169##         if socket.gethostname() == 'bitdiddle':
170##             localhost = 'bitdiddle.cnri.reston.va.us'
171##         elif socket.gethostname() == 'bitdiddle.concentric.net':
172##             localhost = 'localhost'
173##         else:
174##             localhost = None
175##         if localhost is not None:
176##             urls = [
177##                 'file://%s/etc/passwd' % localhost,
178##                 'http://%s/simple/' % localhost,
179##                 'http://%s/digest/' % localhost,
180##                 'http://%s/not/found.h' % localhost,
181##                 ]
182
183##             bauth = HTTPBasicAuthHandler()
184##             bauth.add_password('basic_test_realm', localhost, 'jhylton',
185##                                'password')
186##             dauth = HTTPDigestAuthHandler()
187##             dauth.add_password('digest_test_realm', localhost, 'jhylton',
188##                                'password')
189
190##             self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
191
192    def test_urlwithfrag(self):
193        urlwith_frag = "http://www.pythontest.net/index.html#frag"
194        with socket_helper.transient_internet(urlwith_frag):
195            req = urllib.request.Request(urlwith_frag)
196            res = urllib.request.urlopen(req)
197            self.assertEqual(res.geturl(),
198                    "http://www.pythontest.net/index.html#frag")
199
200    @support.requires_resource('walltime')
201    def test_redirect_url_withfrag(self):
202        redirect_url_with_frag = "http://www.pythontest.net/redir/with_frag/"
203        with socket_helper.transient_internet(redirect_url_with_frag):
204            req = urllib.request.Request(redirect_url_with_frag)
205            res = urllib.request.urlopen(req)
206            self.assertEqual(res.geturl(),
207                    "http://www.pythontest.net/elsewhere/#frag")
208
209    def test_custom_headers(self):
210        url = support.TEST_HTTP_URL
211        with socket_helper.transient_internet(url):
212            opener = urllib.request.build_opener()
213            request = urllib.request.Request(url)
214            self.assertFalse(request.header_items())
215            opener.open(request)
216            self.assertTrue(request.header_items())
217            self.assertTrue(request.has_header('User-agent'))
218            request.add_header('User-Agent','Test-Agent')
219            opener.open(request)
220            self.assertEqual(request.get_header('User-agent'),'Test-Agent')
221
222    @unittest.skip('XXX: http://www.imdb.com is gone')
223    def test_sites_no_connection_close(self):
224        # Some sites do not send Connection: close header.
225        # Verify that those work properly. (#issue12576)
226
227        URL = 'http://www.imdb.com' # mangles Connection:close
228
229        with socket_helper.transient_internet(URL):
230            try:
231                with urllib.request.urlopen(URL) as res:
232                    pass
233            except ValueError:
234                self.fail("urlopen failed for site not sending \
235                           Connection:close")
236            else:
237                self.assertTrue(res)
238
239            req = urllib.request.urlopen(URL)
240            res = req.read()
241            self.assertTrue(res)
242
243    def _test_urls(self, urls, handlers, retry=True):
244        import time
245        import logging
246        debug = logging.getLogger("test_urllib2").debug
247
248        urlopen = urllib.request.build_opener(*handlers).open
249        if retry:
250            urlopen = _wrap_with_retry_thrice(urlopen, urllib.error.URLError)
251
252        for url in urls:
253            with self.subTest(url=url):
254                if isinstance(url, tuple):
255                    url, req, expected_err = url
256                else:
257                    req = expected_err = None
258
259                with socket_helper.transient_internet(url):
260                    try:
261                        f = urlopen(url, req, support.INTERNET_TIMEOUT)
262                    # urllib.error.URLError is a subclass of OSError
263                    except OSError as err:
264                        if expected_err:
265                            msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
266                                   (expected_err, url, req, type(err), err))
267                            self.assertIsInstance(err, expected_err, msg)
268                        else:
269                            raise
270                    else:
271                        try:
272                            with time_out, \
273                                 socket_peer_reset, \
274                                 ioerror_peer_reset:
275                                buf = f.read()
276                                debug("read %d bytes" % len(buf))
277                        except TimeoutError:
278                            print("<timeout: %s>" % url, file=sys.stderr)
279                        f.close()
280                time.sleep(0.1)
281
282    def _extra_handlers(self):
283        handlers = []
284
285        cfh = urllib.request.CacheFTPHandler()
286        self.addCleanup(cfh.clear_cache)
287        cfh.setTimeout(1)
288        handlers.append(cfh)
289
290        return handlers
291
292
293class TimeoutTest(unittest.TestCase):
294    def setUp(self):
295        # clear _opener global variable
296        self.addCleanup(urllib.request.urlcleanup)
297
298    def test_http_basic(self):
299        self.assertIsNone(socket.getdefaulttimeout())
300        url = support.TEST_HTTP_URL
301        with socket_helper.transient_internet(url, timeout=None):
302            u = _urlopen_with_retry(url)
303            self.addCleanup(u.close)
304            self.assertIsNone(u.fp.raw._sock.gettimeout())
305
306    def test_http_default_timeout(self):
307        self.assertIsNone(socket.getdefaulttimeout())
308        url = support.TEST_HTTP_URL
309        with socket_helper.transient_internet(url):
310            socket.setdefaulttimeout(60)
311            try:
312                u = _urlopen_with_retry(url)
313                self.addCleanup(u.close)
314            finally:
315                socket.setdefaulttimeout(None)
316            self.assertEqual(u.fp.raw._sock.gettimeout(), 60)
317
318    def test_http_no_timeout(self):
319        self.assertIsNone(socket.getdefaulttimeout())
320        url = support.TEST_HTTP_URL
321        with socket_helper.transient_internet(url):
322            socket.setdefaulttimeout(60)
323            try:
324                u = _urlopen_with_retry(url, timeout=None)
325                self.addCleanup(u.close)
326            finally:
327                socket.setdefaulttimeout(None)
328            self.assertIsNone(u.fp.raw._sock.gettimeout())
329
330    def test_http_timeout(self):
331        url = support.TEST_HTTP_URL
332        with socket_helper.transient_internet(url):
333            u = _urlopen_with_retry(url, timeout=120)
334            self.addCleanup(u.close)
335            self.assertEqual(u.fp.raw._sock.gettimeout(), 120)
336
337    FTP_HOST = 'ftp://www.pythontest.net/'
338
339    @support.requires_resource('walltime')
340    def test_ftp_basic(self):
341        self.assertIsNone(socket.getdefaulttimeout())
342        with socket_helper.transient_internet(self.FTP_HOST, timeout=None):
343            u = _urlopen_with_retry(self.FTP_HOST)
344            self.addCleanup(u.close)
345            self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
346
347    def test_ftp_default_timeout(self):
348        self.assertIsNone(socket.getdefaulttimeout())
349        with socket_helper.transient_internet(self.FTP_HOST):
350            socket.setdefaulttimeout(60)
351            try:
352                u = _urlopen_with_retry(self.FTP_HOST)
353                self.addCleanup(u.close)
354            finally:
355                socket.setdefaulttimeout(None)
356            self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
357
358    @support.requires_resource('walltime')
359    def test_ftp_no_timeout(self):
360        self.assertIsNone(socket.getdefaulttimeout())
361        with socket_helper.transient_internet(self.FTP_HOST):
362            socket.setdefaulttimeout(60)
363            try:
364                u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
365                self.addCleanup(u.close)
366            finally:
367                socket.setdefaulttimeout(None)
368            self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
369
370    @support.requires_resource('walltime')
371    def test_ftp_timeout(self):
372        with socket_helper.transient_internet(self.FTP_HOST):
373            u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
374            self.addCleanup(u.close)
375            self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
376
377
378if __name__ == "__main__":
379    unittest.main()
380