• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2from test import support
3from test.test_urllib2 import sanepathname2url
4
5import os
6import socket
7import urllib.error
8import urllib.request
9import sys
10
11support.requires("network")
12
13TIMEOUT = 60  # seconds
14
15
16def _retry_thrice(func, exc, *args, **kwargs):
17    for i in range(3):
18        try:
19            return func(*args, **kwargs)
20        except exc as e:
21            last_exc = e
22            continue
23    raise last_exc
24
25def _wrap_with_retry_thrice(func, exc):
26    def wrapped(*args, **kwargs):
27        return _retry_thrice(func, exc, *args, **kwargs)
28    return wrapped
29
30# bpo-35411: FTP tests of test_urllib2net randomly fail
31# with "425 Security: Bad IP connecting" on Travis CI
32skip_ftp_test_on_travis = unittest.skipIf('TRAVIS' in os.environ,
33                                          'bpo-35411: skip FTP test '
34                                          'on Travis CI')
35
36
37# Connecting to remote hosts is flaky.  Make it more robust by retrying
38# the connection several times.
39_urlopen_with_retry = _wrap_with_retry_thrice(urllib.request.urlopen,
40                                              urllib.error.URLError)
41
42
43class AuthTests(unittest.TestCase):
44    """Tests urllib2 authentication features."""
45
46## Disabled at the moment since there is no page under python.org which
47## could be used to HTTP authentication.
48#
49#    def test_basic_auth(self):
50#        import http.client
51#
52#        test_url = "http://www.python.org/test/test_urllib2/basic_auth"
53#        test_hostport = "www.python.org"
54#        test_realm = 'Test Realm'
55#        test_user = 'test.test_urllib2net'
56#        test_password = 'blah'
57#
58#        # failure
59#        try:
60#            _urlopen_with_retry(test_url)
61#        except urllib2.HTTPError, exc:
62#            self.assertEqual(exc.code, 401)
63#        else:
64#            self.fail("urlopen() should have failed with 401")
65#
66#        # success
67#        auth_handler = urllib2.HTTPBasicAuthHandler()
68#        auth_handler.add_password(test_realm, test_hostport,
69#                                  test_user, test_password)
70#        opener = urllib2.build_opener(auth_handler)
71#        f = opener.open('http://localhost/')
72#        response = _urlopen_with_retry("http://www.python.org/")
73#
74#        # The 'userinfo' URL component is deprecated by RFC 3986 for security
75#        # reasons, let's not implement it!  (it's already implemented for proxy
76#        # specification strings (that is, URLs or authorities specifying a
77#        # proxy), so we must keep that)
78#        self.assertRaises(http.client.InvalidURL,
79#                          urllib2.urlopen, "http://evil:thing@example.com")
80
81
82class CloseSocketTest(unittest.TestCase):
83
84    def test_close(self):
85        # calling .close() on urllib2's response objects should close the
86        # underlying socket
87        url = support.TEST_HTTP_URL
88        with support.transient_internet(url):
89            response = _urlopen_with_retry(url)
90            sock = response.fp
91            self.assertFalse(sock.closed)
92            response.close()
93            self.assertTrue(sock.closed)
94
95class OtherNetworkTests(unittest.TestCase):
96    def setUp(self):
97        if 0:  # for debugging
98            import logging
99            logger = logging.getLogger("test_urllib2net")
100            logger.addHandler(logging.StreamHandler())
101
102    # XXX The rest of these tests aren't very good -- they don't check much.
103    # They do sometimes catch some major disasters, though.
104
105    @skip_ftp_test_on_travis
106    def test_ftp(self):
107        urls = [
108            'ftp://www.pythontest.net/README',
109            ('ftp://www.pythontest.net/non-existent-file',
110             None, urllib.error.URLError),
111            ]
112        self._test_urls(urls, self._extra_handlers())
113
114    def test_file(self):
115        TESTFN = support.TESTFN
116        f = open(TESTFN, 'w')
117        try:
118            f.write('hi there\n')
119            f.close()
120            urls = [
121                'file:' + sanepathname2url(os.path.abspath(TESTFN)),
122                ('file:///nonsensename/etc/passwd', None,
123                 urllib.error.URLError),
124                ]
125            self._test_urls(urls, self._extra_handlers(), retry=True)
126        finally:
127            os.remove(TESTFN)
128
129        self.assertRaises(ValueError, urllib.request.urlopen,'./relative_path/to/file')
130
131    # XXX Following test depends on machine configurations that are internal
132    # to CNRI.  Need to set up a public server with the right authentication
133    # configuration for test purposes.
134
135##     def test_cnri(self):
136##         if socket.gethostname() == 'bitdiddle':
137##             localhost = 'bitdiddle.cnri.reston.va.us'
138##         elif socket.gethostname() == 'bitdiddle.concentric.net':
139##             localhost = 'localhost'
140##         else:
141##             localhost = None
142##         if localhost is not None:
143##             urls = [
144##                 'file://%s/etc/passwd' % localhost,
145##                 'http://%s/simple/' % localhost,
146##                 'http://%s/digest/' % localhost,
147##                 'http://%s/not/found.h' % localhost,
148##                 ]
149
150##             bauth = HTTPBasicAuthHandler()
151##             bauth.add_password('basic_test_realm', localhost, 'jhylton',
152##                                'password')
153##             dauth = HTTPDigestAuthHandler()
154##             dauth.add_password('digest_test_realm', localhost, 'jhylton',
155##                                'password')
156
157##             self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
158
159    def test_urlwithfrag(self):
160        urlwith_frag = "http://www.pythontest.net/index.html#frag"
161        with support.transient_internet(urlwith_frag):
162            req = urllib.request.Request(urlwith_frag)
163            res = urllib.request.urlopen(req)
164            self.assertEqual(res.geturl(),
165                    "http://www.pythontest.net/index.html#frag")
166
167    def test_redirect_url_withfrag(self):
168        redirect_url_with_frag = "http://www.pythontest.net/redir/with_frag/"
169        with support.transient_internet(redirect_url_with_frag):
170            req = urllib.request.Request(redirect_url_with_frag)
171            res = urllib.request.urlopen(req)
172            self.assertEqual(res.geturl(),
173                    "http://www.pythontest.net/elsewhere/#frag")
174
175    def test_custom_headers(self):
176        url = support.TEST_HTTP_URL
177        with support.transient_internet(url):
178            opener = urllib.request.build_opener()
179            request = urllib.request.Request(url)
180            self.assertFalse(request.header_items())
181            opener.open(request)
182            self.assertTrue(request.header_items())
183            self.assertTrue(request.has_header('User-agent'))
184            request.add_header('User-Agent','Test-Agent')
185            opener.open(request)
186            self.assertEqual(request.get_header('User-agent'),'Test-Agent')
187
188    @unittest.skip('XXX: http://www.imdb.com is gone')
189    def test_sites_no_connection_close(self):
190        # Some sites do not send Connection: close header.
191        # Verify that those work properly. (#issue12576)
192
193        URL = 'http://www.imdb.com' # mangles Connection:close
194
195        with support.transient_internet(URL):
196            try:
197                with urllib.request.urlopen(URL) as res:
198                    pass
199            except ValueError as e:
200                self.fail("urlopen failed for site not sending \
201                           Connection:close")
202            else:
203                self.assertTrue(res)
204
205            req = urllib.request.urlopen(URL)
206            res = req.read()
207            self.assertTrue(res)
208
209    def _test_urls(self, urls, handlers, retry=True):
210        import time
211        import logging
212        debug = logging.getLogger("test_urllib2").debug
213
214        urlopen = urllib.request.build_opener(*handlers).open
215        if retry:
216            urlopen = _wrap_with_retry_thrice(urlopen, urllib.error.URLError)
217
218        for url in urls:
219            with self.subTest(url=url):
220                if isinstance(url, tuple):
221                    url, req, expected_err = url
222                else:
223                    req = expected_err = None
224
225                with support.transient_internet(url):
226                    try:
227                        f = urlopen(url, req, TIMEOUT)
228                    # urllib.error.URLError is a subclass of OSError
229                    except OSError as err:
230                        if expected_err:
231                            msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
232                                   (expected_err, url, req, type(err), err))
233                            self.assertIsInstance(err, expected_err, msg)
234                        else:
235                            raise
236                    else:
237                        try:
238                            with support.time_out, \
239                                 support.socket_peer_reset, \
240                                 support.ioerror_peer_reset:
241                                buf = f.read()
242                                debug("read %d bytes" % len(buf))
243                        except socket.timeout:
244                            print("<timeout: %s>" % url, file=sys.stderr)
245                        f.close()
246                time.sleep(0.1)
247
248    def _extra_handlers(self):
249        handlers = []
250
251        cfh = urllib.request.CacheFTPHandler()
252        self.addCleanup(cfh.clear_cache)
253        cfh.setTimeout(1)
254        handlers.append(cfh)
255
256        return handlers
257
258
259class TimeoutTest(unittest.TestCase):
260    def test_http_basic(self):
261        self.assertIsNone(socket.getdefaulttimeout())
262        url = support.TEST_HTTP_URL
263        with support.transient_internet(url, timeout=None):
264            u = _urlopen_with_retry(url)
265            self.addCleanup(u.close)
266            self.assertIsNone(u.fp.raw._sock.gettimeout())
267
268    def test_http_default_timeout(self):
269        self.assertIsNone(socket.getdefaulttimeout())
270        url = support.TEST_HTTP_URL
271        with support.transient_internet(url):
272            socket.setdefaulttimeout(60)
273            try:
274                u = _urlopen_with_retry(url)
275                self.addCleanup(u.close)
276            finally:
277                socket.setdefaulttimeout(None)
278            self.assertEqual(u.fp.raw._sock.gettimeout(), 60)
279
280    def test_http_no_timeout(self):
281        self.assertIsNone(socket.getdefaulttimeout())
282        url = support.TEST_HTTP_URL
283        with support.transient_internet(url):
284            socket.setdefaulttimeout(60)
285            try:
286                u = _urlopen_with_retry(url, timeout=None)
287                self.addCleanup(u.close)
288            finally:
289                socket.setdefaulttimeout(None)
290            self.assertIsNone(u.fp.raw._sock.gettimeout())
291
292    def test_http_timeout(self):
293        url = support.TEST_HTTP_URL
294        with support.transient_internet(url):
295            u = _urlopen_with_retry(url, timeout=120)
296            self.addCleanup(u.close)
297            self.assertEqual(u.fp.raw._sock.gettimeout(), 120)
298
299    FTP_HOST = 'ftp://www.pythontest.net/'
300
301    @skip_ftp_test_on_travis
302    def test_ftp_basic(self):
303        self.assertIsNone(socket.getdefaulttimeout())
304        with support.transient_internet(self.FTP_HOST, timeout=None):
305            u = _urlopen_with_retry(self.FTP_HOST)
306            self.addCleanup(u.close)
307            self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
308
309    @skip_ftp_test_on_travis
310    def test_ftp_default_timeout(self):
311        self.assertIsNone(socket.getdefaulttimeout())
312        with support.transient_internet(self.FTP_HOST):
313            socket.setdefaulttimeout(60)
314            try:
315                u = _urlopen_with_retry(self.FTP_HOST)
316                self.addCleanup(u.close)
317            finally:
318                socket.setdefaulttimeout(None)
319            self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
320
321    @skip_ftp_test_on_travis
322    def test_ftp_no_timeout(self):
323        self.assertIsNone(socket.getdefaulttimeout())
324        with support.transient_internet(self.FTP_HOST):
325            socket.setdefaulttimeout(60)
326            try:
327                u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
328                self.addCleanup(u.close)
329            finally:
330                socket.setdefaulttimeout(None)
331            self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
332
333    @skip_ftp_test_on_travis
334    def test_ftp_timeout(self):
335        with support.transient_internet(self.FTP_HOST):
336            u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
337            self.addCleanup(u.close)
338            self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
339
340
341if __name__ == "__main__":
342    unittest.main()
343