• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2from test import support
3
4import contextlib
5import socket
6import urllib.request
7import os
8import email.message
9import time
10
11
12support.requires('network')
13
14class URLTimeoutTest(unittest.TestCase):
15    # XXX this test doesn't seem to test anything useful.
16
17    TIMEOUT = 30.0
18
19    def setUp(self):
20        socket.setdefaulttimeout(self.TIMEOUT)
21
22    def tearDown(self):
23        socket.setdefaulttimeout(None)
24
25    def testURLread(self):
26        with support.transient_internet("www.example.com"):
27            f = urllib.request.urlopen("http://www.example.com/")
28            x = f.read()
29
30
31class urlopenNetworkTests(unittest.TestCase):
32    """Tests urllib.reqest.urlopen using the network.
33
34    These tests are not exhaustive.  Assuming that testing using files does a
35    good job overall of some of the basic interface features.  There are no
36    tests exercising the optional 'data' and 'proxies' arguments.  No tests
37    for transparent redirection have been written.
38
39    setUp is not used for always constructing a connection to
40    http://www.pythontest.net/ since there a few tests that don't use that address
41    and making a connection is expensive enough to warrant minimizing unneeded
42    connections.
43
44    """
45
46    url = 'http://www.pythontest.net/'
47
48    @contextlib.contextmanager
49    def urlopen(self, *args, **kwargs):
50        resource = args[0]
51        with support.transient_internet(resource):
52            r = urllib.request.urlopen(*args, **kwargs)
53            try:
54                yield r
55            finally:
56                r.close()
57
58    def test_basic(self):
59        # Simple test expected to pass.
60        with self.urlopen(self.url) as open_url:
61            for attr in ("read", "readline", "readlines", "fileno", "close",
62                         "info", "geturl"):
63                self.assertTrue(hasattr(open_url, attr), "object returned from "
64                                "urlopen lacks the %s attribute" % attr)
65            self.assertTrue(open_url.read(), "calling 'read' failed")
66
67    def test_readlines(self):
68        # Test both readline and readlines.
69        with self.urlopen(self.url) as open_url:
70            self.assertIsInstance(open_url.readline(), bytes,
71                                  "readline did not return a string")
72            self.assertIsInstance(open_url.readlines(), list,
73                                  "readlines did not return a list")
74
75    def test_info(self):
76        # Test 'info'.
77        with self.urlopen(self.url) as open_url:
78            info_obj = open_url.info()
79            self.assertIsInstance(info_obj, email.message.Message,
80                                  "object returned by 'info' is not an "
81                                  "instance of email.message.Message")
82            self.assertEqual(info_obj.get_content_subtype(), "html")
83
84    def test_geturl(self):
85        # Make sure same URL as opened is returned by geturl.
86        with self.urlopen(self.url) as open_url:
87            gotten_url = open_url.geturl()
88            self.assertEqual(gotten_url, self.url)
89
90    def test_getcode(self):
91        # test getcode() with the fancy opener to get 404 error codes
92        URL = self.url + "XXXinvalidXXX"
93        with support.transient_internet(URL):
94            with self.assertWarns(DeprecationWarning):
95                open_url = urllib.request.FancyURLopener().open(URL)
96            try:
97                code = open_url.getcode()
98            finally:
99                open_url.close()
100            self.assertEqual(code, 404)
101
102    def test_bad_address(self):
103        # Make sure proper exception is raised when connecting to a bogus
104        # address.
105
106        # Given that both VeriSign and various ISPs have in
107        # the past or are presently hijacking various invalid
108        # domain name requests in an attempt to boost traffic
109        # to their own sites, finding a domain name to use
110        # for this test is difficult.  RFC2606 leads one to
111        # believe that '.invalid' should work, but experience
112        # seemed to indicate otherwise.  Single character
113        # TLDs are likely to remain invalid, so this seems to
114        # be the best choice. The trailing '.' prevents a
115        # related problem: The normal DNS resolver appends
116        # the domain names from the search path if there is
117        # no '.' the end and, and if one of those domains
118        # implements a '*' rule a result is returned.
119        # However, none of this will prevent the test from
120        # failing if the ISP hijacks all invalid domain
121        # requests.  The real solution would be to be able to
122        # parameterize the framework with a mock resolver.
123        bogus_domain = "sadflkjsasf.i.nvali.d."
124        try:
125            socket.gethostbyname(bogus_domain)
126        except OSError:
127            # socket.gaierror is too narrow, since getaddrinfo() may also
128            # fail with EAI_SYSTEM and ETIMEDOUT (seen on Ubuntu 13.04),
129            # i.e. Python's TimeoutError.
130            pass
131        else:
132            # This happens with some overzealous DNS providers such as OpenDNS
133            self.skipTest("%r should not resolve for test to work" % bogus_domain)
134        failure_explanation = ('opening an invalid URL did not raise OSError; '
135                               'can be caused by a broken DNS server '
136                               '(e.g. returns 404 or hijacks page)')
137        with self.assertRaises(OSError, msg=failure_explanation):
138            urllib.request.urlopen("http://{}/".format(bogus_domain))
139
140
141class urlretrieveNetworkTests(unittest.TestCase):
142    """Tests urllib.request.urlretrieve using the network."""
143
144    @contextlib.contextmanager
145    def urlretrieve(self, *args, **kwargs):
146        resource = args[0]
147        with support.transient_internet(resource):
148            file_location, info = urllib.request.urlretrieve(*args, **kwargs)
149            try:
150                yield file_location, info
151            finally:
152                support.unlink(file_location)
153
154    def test_basic(self):
155        # Test basic functionality.
156        with self.urlretrieve(self.logo) as (file_location, info):
157            self.assertTrue(os.path.exists(file_location), "file location returned by"
158                            " urlretrieve is not a valid path")
159            with open(file_location, 'rb') as f:
160                self.assertTrue(f.read(), "reading from the file location returned"
161                                " by urlretrieve failed")
162
163    def test_specified_path(self):
164        # Make sure that specifying the location of the file to write to works.
165        with self.urlretrieve(self.logo,
166                              support.TESTFN) as (file_location, info):
167            self.assertEqual(file_location, support.TESTFN)
168            self.assertTrue(os.path.exists(file_location))
169            with open(file_location, 'rb') as f:
170                self.assertTrue(f.read(), "reading from temporary file failed")
171
172    def test_header(self):
173        # Make sure header returned as 2nd value from urlretrieve is good.
174        with self.urlretrieve(self.logo) as (file_location, info):
175            self.assertIsInstance(info, email.message.Message,
176                                  "info is not an instance of email.message.Message")
177
178    logo = "http://www.pythontest.net/"
179
180    def test_data_header(self):
181        with self.urlretrieve(self.logo) as (file_location, fileheaders):
182            datevalue = fileheaders.get('Date')
183            dateformat = '%a, %d %b %Y %H:%M:%S GMT'
184            try:
185                time.strptime(datevalue, dateformat)
186            except ValueError:
187                self.fail('Date value not in %r format' % dateformat)
188
189    def test_reporthook(self):
190        records = []
191        def recording_reporthook(blocks, block_size, total_size):
192            records.append((blocks, block_size, total_size))
193
194        with self.urlretrieve(self.logo, reporthook=recording_reporthook) as (
195                file_location, fileheaders):
196            expected_size = int(fileheaders['Content-Length'])
197
198        records_repr = repr(records)  # For use in error messages.
199        self.assertGreater(len(records), 1, msg="There should always be two "
200                           "calls; the first one before the transfer starts.")
201        self.assertEqual(records[0][0], 0)
202        self.assertGreater(records[0][1], 0,
203                           msg="block size can't be 0 in %s" % records_repr)
204        self.assertEqual(records[0][2], expected_size)
205        self.assertEqual(records[-1][2], expected_size)
206
207        block_sizes = {block_size for _, block_size, _ in records}
208        self.assertEqual({records[0][1]}, block_sizes,
209                         msg="block sizes in %s must be equal" % records_repr)
210        self.assertGreaterEqual(records[-1][0]*records[0][1], expected_size,
211                                msg="number of blocks * block size must be"
212                                " >= total size in %s" % records_repr)
213
214
215if __name__ == "__main__":
216    unittest.main()
217