• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import unittest
2from test import support
3from test.support import socket_helper
4
5import contextlib
6import socket
7import urllib.parse
8import urllib.request
9import os
10import email.message
11import time
12
13
14support.requires('network')
15
16
17class URLTimeoutTest(unittest.TestCase):
18    # XXX this test doesn't seem to test anything useful.
19
20    def setUp(self):
21        socket.setdefaulttimeout(support.INTERNET_TIMEOUT)
22
23    def tearDown(self):
24        socket.setdefaulttimeout(None)
25
26    def testURLread(self):
27        # clear _opener global variable
28        self.addCleanup(urllib.request.urlcleanup)
29
30        domain = urllib.parse.urlparse(support.TEST_HTTP_URL).netloc
31        with socket_helper.transient_internet(domain):
32            f = urllib.request.urlopen(support.TEST_HTTP_URL)
33            f.read()
34
35
36class urlopenNetworkTests(unittest.TestCase):
37    """Tests urllib.request.urlopen using the network.
38
39    These tests are not exhaustive.  Assuming that testing using files does a
40    good job overall of some of the basic interface features.  There are no
41    tests exercising the optional 'data' and 'proxies' arguments.  No tests
42    for transparent redirection have been written.
43
44    setUp is not used for always constructing a connection to
45    http://www.pythontest.net/ since there a few tests that don't use that address
46    and making a connection is expensive enough to warrant minimizing unneeded
47    connections.
48
49    """
50
51    url = 'http://www.pythontest.net/'
52
53    def setUp(self):
54        # clear _opener global variable
55        self.addCleanup(urllib.request.urlcleanup)
56
57    @contextlib.contextmanager
58    def urlopen(self, *args, **kwargs):
59        resource = args[0]
60        with socket_helper.transient_internet(resource):
61            r = urllib.request.urlopen(*args, **kwargs)
62            try:
63                yield r
64            finally:
65                r.close()
66
67    def test_basic(self):
68        # Simple test expected to pass.
69        with self.urlopen(self.url) as open_url:
70            for attr in ("read", "readline", "readlines", "fileno", "close",
71                         "info", "geturl"):
72                self.assertTrue(hasattr(open_url, attr), "object returned from "
73                                "urlopen lacks the %s attribute" % attr)
74            self.assertTrue(open_url.read(), "calling 'read' failed")
75
76    def test_readlines(self):
77        # Test both readline and readlines.
78        with self.urlopen(self.url) as open_url:
79            self.assertIsInstance(open_url.readline(), bytes,
80                                  "readline did not return a string")
81            self.assertIsInstance(open_url.readlines(), list,
82                                  "readlines did not return a list")
83
84    def test_info(self):
85        # Test 'info'.
86        with self.urlopen(self.url) as open_url:
87            info_obj = open_url.info()
88            self.assertIsInstance(info_obj, email.message.Message,
89                                  "object returned by 'info' is not an "
90                                  "instance of email.message.Message")
91            self.assertEqual(info_obj.get_content_subtype(), "html")
92
93    def test_geturl(self):
94        # Make sure same URL as opened is returned by geturl.
95        with self.urlopen(self.url) as open_url:
96            gotten_url = open_url.geturl()
97            self.assertEqual(gotten_url, self.url)
98
99    def test_getcode(self):
100        # test getcode() with the fancy opener to get 404 error codes
101        URL = self.url + "XXXinvalidXXX"
102        with socket_helper.transient_internet(URL):
103            with self.assertWarns(DeprecationWarning):
104                open_url = urllib.request.FancyURLopener().open(URL)
105            try:
106                code = open_url.getcode()
107            finally:
108                open_url.close()
109            self.assertEqual(code, 404)
110
111    def test_bad_address(self):
112        # Make sure proper exception is raised when connecting to a bogus
113        # address.
114
115        # Given that both VeriSign and various ISPs have in
116        # the past or are presently hijacking various invalid
117        # domain name requests in an attempt to boost traffic
118        # to their own sites, finding a domain name to use
119        # for this test is difficult.  RFC2606 leads one to
120        # believe that '.invalid' should work, but experience
121        # seemed to indicate otherwise.  Single character
122        # TLDs are likely to remain invalid, so this seems to
123        # be the best choice. The trailing '.' prevents a
124        # related problem: The normal DNS resolver appends
125        # the domain names from the search path if there is
126        # no '.' the end and, and if one of those domains
127        # implements a '*' rule a result is returned.
128        # However, none of this will prevent the test from
129        # failing if the ISP hijacks all invalid domain
130        # requests.  The real solution would be to be able to
131        # parameterize the framework with a mock resolver.
132        bogus_domain = "sadflkjsasf.i.nvali.d."
133        try:
134            socket.gethostbyname(bogus_domain)
135        except OSError:
136            # socket.gaierror is too narrow, since getaddrinfo() may also
137            # fail with EAI_SYSTEM and ETIMEDOUT (seen on Ubuntu 13.04),
138            # i.e. Python's TimeoutError.
139            pass
140        else:
141            # This happens with some overzealous DNS providers such as OpenDNS
142            self.skipTest("%r should not resolve for test to work" % bogus_domain)
143        failure_explanation = ('opening an invalid URL did not raise OSError; '
144                               'can be caused by a broken DNS server '
145                               '(e.g. returns 404 or hijacks page)')
146        with self.assertRaises(OSError, msg=failure_explanation):
147            urllib.request.urlopen("http://{}/".format(bogus_domain))
148
149
150class urlretrieveNetworkTests(unittest.TestCase):
151    """Tests urllib.request.urlretrieve using the network."""
152
153    def setUp(self):
154        # remove temporary files created by urlretrieve()
155        self.addCleanup(urllib.request.urlcleanup)
156
157    @contextlib.contextmanager
158    def urlretrieve(self, *args, **kwargs):
159        resource = args[0]
160        with socket_helper.transient_internet(resource):
161            file_location, info = urllib.request.urlretrieve(*args, **kwargs)
162            try:
163                yield file_location, info
164            finally:
165                support.unlink(file_location)
166
167    def test_basic(self):
168        # Test basic functionality.
169        with self.urlretrieve(self.logo) as (file_location, info):
170            self.assertTrue(os.path.exists(file_location), "file location returned by"
171                            " urlretrieve is not a valid path")
172            with open(file_location, 'rb') as f:
173                self.assertTrue(f.read(), "reading from the file location returned"
174                                " by urlretrieve failed")
175
176    def test_specified_path(self):
177        # Make sure that specifying the location of the file to write to works.
178        with self.urlretrieve(self.logo,
179                              support.TESTFN) as (file_location, info):
180            self.assertEqual(file_location, support.TESTFN)
181            self.assertTrue(os.path.exists(file_location))
182            with open(file_location, 'rb') as f:
183                self.assertTrue(f.read(), "reading from temporary file failed")
184
185    def test_header(self):
186        # Make sure header returned as 2nd value from urlretrieve is good.
187        with self.urlretrieve(self.logo) as (file_location, info):
188            self.assertIsInstance(info, email.message.Message,
189                                  "info is not an instance of email.message.Message")
190
191    logo = "http://www.pythontest.net/"
192
193    def test_data_header(self):
194        with self.urlretrieve(self.logo) as (file_location, fileheaders):
195            datevalue = fileheaders.get('Date')
196            dateformat = '%a, %d %b %Y %H:%M:%S GMT'
197            try:
198                time.strptime(datevalue, dateformat)
199            except ValueError:
200                self.fail('Date value not in %r format' % dateformat)
201
202    def test_reporthook(self):
203        records = []
204
205        def recording_reporthook(blocks, block_size, total_size):
206            records.append((blocks, block_size, total_size))
207
208        with self.urlretrieve(self.logo, reporthook=recording_reporthook) as (
209                file_location, fileheaders):
210            expected_size = int(fileheaders['Content-Length'])
211
212        records_repr = repr(records)  # For use in error messages.
213        self.assertGreater(len(records), 1, msg="There should always be two "
214                           "calls; the first one before the transfer starts.")
215        self.assertEqual(records[0][0], 0)
216        self.assertGreater(records[0][1], 0,
217                           msg="block size can't be 0 in %s" % records_repr)
218        self.assertEqual(records[0][2], expected_size)
219        self.assertEqual(records[-1][2], expected_size)
220
221        block_sizes = {block_size for _, block_size, _ in records}
222        self.assertEqual({records[0][1]}, block_sizes,
223                         msg="block sizes in %s must be equal" % records_repr)
224        self.assertGreaterEqual(records[-1][0]*records[0][1], expected_size,
225                                msg="number of blocks * block size must be"
226                                " >= total size in %s" % records_repr)
227
228
229if __name__ == "__main__":
230    unittest.main()
231