1import unittest 2from test import support 3from test.support import os_helper 4from test.support import socket_helper 5 6import contextlib 7import socket 8import urllib.parse 9import urllib.request 10import os 11import email.message 12import time 13 14 15support.requires('network') 16 17 18class URLTimeoutTest(unittest.TestCase): 19 # XXX this test doesn't seem to test anything useful. 20 21 def setUp(self): 22 socket.setdefaulttimeout(support.INTERNET_TIMEOUT) 23 24 def tearDown(self): 25 socket.setdefaulttimeout(None) 26 27 def testURLread(self): 28 # clear _opener global variable 29 self.addCleanup(urllib.request.urlcleanup) 30 31 domain = urllib.parse.urlparse(support.TEST_HTTP_URL).netloc 32 with socket_helper.transient_internet(domain): 33 f = urllib.request.urlopen(support.TEST_HTTP_URL) 34 f.read() 35 36 37class urlopenNetworkTests(unittest.TestCase): 38 """Tests urllib.request.urlopen using the network. 39 40 These tests are not exhaustive. Assuming that testing using files does a 41 good job overall of some of the basic interface features. There are no 42 tests exercising the optional 'data' and 'proxies' arguments. No tests 43 for transparent redirection have been written. 44 45 setUp is not used for always constructing a connection to 46 http://www.pythontest.net/ since there a few tests that don't use that address 47 and making a connection is expensive enough to warrant minimizing unneeded 48 connections. 49 50 """ 51 52 url = 'http://www.pythontest.net/' 53 54 def setUp(self): 55 # clear _opener global variable 56 self.addCleanup(urllib.request.urlcleanup) 57 58 @contextlib.contextmanager 59 def urlopen(self, *args, **kwargs): 60 resource = args[0] 61 with socket_helper.transient_internet(resource): 62 r = urllib.request.urlopen(*args, **kwargs) 63 try: 64 yield r 65 finally: 66 r.close() 67 68 def test_basic(self): 69 # Simple test expected to pass. 70 with self.urlopen(self.url) as open_url: 71 for attr in ("read", "readline", "readlines", "fileno", "close", 72 "info", "geturl"): 73 self.assertTrue(hasattr(open_url, attr), "object returned from " 74 "urlopen lacks the %s attribute" % attr) 75 self.assertTrue(open_url.read(), "calling 'read' failed") 76 77 def test_readlines(self): 78 # Test both readline and readlines. 79 with self.urlopen(self.url) as open_url: 80 self.assertIsInstance(open_url.readline(), bytes, 81 "readline did not return a string") 82 self.assertIsInstance(open_url.readlines(), list, 83 "readlines did not return a list") 84 85 def test_info(self): 86 # Test 'info'. 87 with self.urlopen(self.url) as open_url: 88 info_obj = open_url.info() 89 self.assertIsInstance(info_obj, email.message.Message, 90 "object returned by 'info' is not an " 91 "instance of email.message.Message") 92 self.assertEqual(info_obj.get_content_subtype(), "html") 93 94 def test_geturl(self): 95 # Make sure same URL as opened is returned by geturl. 96 with self.urlopen(self.url) as open_url: 97 gotten_url = open_url.geturl() 98 self.assertEqual(gotten_url, self.url) 99 100 def test_getcode(self): 101 # test getcode() with the fancy opener to get 404 error codes 102 URL = self.url + "XXXinvalidXXX" 103 with socket_helper.transient_internet(URL): 104 with self.assertWarns(DeprecationWarning): 105 open_url = urllib.request.FancyURLopener().open(URL) 106 try: 107 code = open_url.getcode() 108 finally: 109 open_url.close() 110 self.assertEqual(code, 404) 111 112 def test_bad_address(self): 113 # Make sure proper exception is raised when connecting to a bogus 114 # address. 115 116 # Given that both VeriSign and various ISPs have in 117 # the past or are presently hijacking various invalid 118 # domain name requests in an attempt to boost traffic 119 # to their own sites, finding a domain name to use 120 # for this test is difficult. RFC2606 leads one to 121 # believe that '.invalid' should work, but experience 122 # seemed to indicate otherwise. Single character 123 # TLDs are likely to remain invalid, so this seems to 124 # be the best choice. The trailing '.' prevents a 125 # related problem: The normal DNS resolver appends 126 # the domain names from the search path if there is 127 # no '.' the end and, and if one of those domains 128 # implements a '*' rule a result is returned. 129 # However, none of this will prevent the test from 130 # failing if the ISP hijacks all invalid domain 131 # requests. The real solution would be to be able to 132 # parameterize the framework with a mock resolver. 133 bogus_domain = "sadflkjsasf.i.nvali.d." 134 try: 135 socket.gethostbyname(bogus_domain) 136 except OSError: 137 # socket.gaierror is too narrow, since getaddrinfo() may also 138 # fail with EAI_SYSTEM and ETIMEDOUT (seen on Ubuntu 13.04), 139 # i.e. Python's TimeoutError. 140 pass 141 else: 142 # This happens with some overzealous DNS providers such as OpenDNS 143 self.skipTest("%r should not resolve for test to work" % bogus_domain) 144 failure_explanation = ('opening an invalid URL did not raise OSError; ' 145 'can be caused by a broken DNS server ' 146 '(e.g. returns 404 or hijacks page)') 147 with self.assertRaises(OSError, msg=failure_explanation): 148 urllib.request.urlopen("http://{}/".format(bogus_domain)) 149 150 151class urlretrieveNetworkTests(unittest.TestCase): 152 """Tests urllib.request.urlretrieve using the network.""" 153 154 def setUp(self): 155 # remove temporary files created by urlretrieve() 156 self.addCleanup(urllib.request.urlcleanup) 157 158 @contextlib.contextmanager 159 def urlretrieve(self, *args, **kwargs): 160 resource = args[0] 161 with socket_helper.transient_internet(resource): 162 file_location, info = urllib.request.urlretrieve(*args, **kwargs) 163 try: 164 yield file_location, info 165 finally: 166 os_helper.unlink(file_location) 167 168 def test_basic(self): 169 # Test basic functionality. 170 with self.urlretrieve(self.logo) as (file_location, info): 171 self.assertTrue(os.path.exists(file_location), "file location returned by" 172 " urlretrieve is not a valid path") 173 with open(file_location, 'rb') as f: 174 self.assertTrue(f.read(), "reading from the file location returned" 175 " by urlretrieve failed") 176 177 def test_specified_path(self): 178 # Make sure that specifying the location of the file to write to works. 179 with self.urlretrieve(self.logo, 180 os_helper.TESTFN) as (file_location, info): 181 self.assertEqual(file_location, os_helper.TESTFN) 182 self.assertTrue(os.path.exists(file_location)) 183 with open(file_location, 'rb') as f: 184 self.assertTrue(f.read(), "reading from temporary file failed") 185 186 def test_header(self): 187 # Make sure header returned as 2nd value from urlretrieve is good. 188 with self.urlretrieve(self.logo) as (file_location, info): 189 self.assertIsInstance(info, email.message.Message, 190 "info is not an instance of email.message.Message") 191 192 logo = "http://www.pythontest.net/" 193 194 def test_data_header(self): 195 with self.urlretrieve(self.logo) as (file_location, fileheaders): 196 datevalue = fileheaders.get('Date') 197 dateformat = '%a, %d %b %Y %H:%M:%S GMT' 198 try: 199 time.strptime(datevalue, dateformat) 200 except ValueError: 201 self.fail('Date value not in %r format' % dateformat) 202 203 def test_reporthook(self): 204 records = [] 205 206 def recording_reporthook(blocks, block_size, total_size): 207 records.append((blocks, block_size, total_size)) 208 209 with self.urlretrieve(self.logo, reporthook=recording_reporthook) as ( 210 file_location, fileheaders): 211 expected_size = int(fileheaders['Content-Length']) 212 213 records_repr = repr(records) # For use in error messages. 214 self.assertGreater(len(records), 1, msg="There should always be two " 215 "calls; the first one before the transfer starts.") 216 self.assertEqual(records[0][0], 0) 217 self.assertGreater(records[0][1], 0, 218 msg="block size can't be 0 in %s" % records_repr) 219 self.assertEqual(records[0][2], expected_size) 220 self.assertEqual(records[-1][2], expected_size) 221 222 block_sizes = {block_size for _, block_size, _ in records} 223 self.assertEqual({records[0][1]}, block_sizes, 224 msg="block sizes in %s must be equal" % records_repr) 225 self.assertGreaterEqual(records[-1][0]*records[0][1], expected_size, 226 msg="number of blocks * block size must be" 227 " >= total size in %s" % records_repr) 228 229 230if __name__ == "__main__": 231 unittest.main() 232