1import unittest 2from test import support 3from test.support import socket_helper 4 5import contextlib 6import socket 7import urllib.parse 8import urllib.request 9import os 10import email.message 11import time 12 13 14support.requires('network') 15 16 17class URLTimeoutTest(unittest.TestCase): 18 # XXX this test doesn't seem to test anything useful. 19 20 def setUp(self): 21 socket.setdefaulttimeout(support.INTERNET_TIMEOUT) 22 23 def tearDown(self): 24 socket.setdefaulttimeout(None) 25 26 def testURLread(self): 27 # clear _opener global variable 28 self.addCleanup(urllib.request.urlcleanup) 29 30 domain = urllib.parse.urlparse(support.TEST_HTTP_URL).netloc 31 with socket_helper.transient_internet(domain): 32 f = urllib.request.urlopen(support.TEST_HTTP_URL) 33 f.read() 34 35 36class urlopenNetworkTests(unittest.TestCase): 37 """Tests urllib.request.urlopen using the network. 38 39 These tests are not exhaustive. Assuming that testing using files does a 40 good job overall of some of the basic interface features. There are no 41 tests exercising the optional 'data' and 'proxies' arguments. No tests 42 for transparent redirection have been written. 43 44 setUp is not used for always constructing a connection to 45 http://www.pythontest.net/ since there a few tests that don't use that address 46 and making a connection is expensive enough to warrant minimizing unneeded 47 connections. 48 49 """ 50 51 url = 'http://www.pythontest.net/' 52 53 def setUp(self): 54 # clear _opener global variable 55 self.addCleanup(urllib.request.urlcleanup) 56 57 @contextlib.contextmanager 58 def urlopen(self, *args, **kwargs): 59 resource = args[0] 60 with socket_helper.transient_internet(resource): 61 r = urllib.request.urlopen(*args, **kwargs) 62 try: 63 yield r 64 finally: 65 r.close() 66 67 def test_basic(self): 68 # Simple test expected to pass. 69 with self.urlopen(self.url) as open_url: 70 for attr in ("read", "readline", "readlines", "fileno", "close", 71 "info", "geturl"): 72 self.assertTrue(hasattr(open_url, attr), "object returned from " 73 "urlopen lacks the %s attribute" % attr) 74 self.assertTrue(open_url.read(), "calling 'read' failed") 75 76 def test_readlines(self): 77 # Test both readline and readlines. 78 with self.urlopen(self.url) as open_url: 79 self.assertIsInstance(open_url.readline(), bytes, 80 "readline did not return a string") 81 self.assertIsInstance(open_url.readlines(), list, 82 "readlines did not return a list") 83 84 def test_info(self): 85 # Test 'info'. 86 with self.urlopen(self.url) as open_url: 87 info_obj = open_url.info() 88 self.assertIsInstance(info_obj, email.message.Message, 89 "object returned by 'info' is not an " 90 "instance of email.message.Message") 91 self.assertEqual(info_obj.get_content_subtype(), "html") 92 93 def test_geturl(self): 94 # Make sure same URL as opened is returned by geturl. 95 with self.urlopen(self.url) as open_url: 96 gotten_url = open_url.geturl() 97 self.assertEqual(gotten_url, self.url) 98 99 def test_getcode(self): 100 # test getcode() with the fancy opener to get 404 error codes 101 URL = self.url + "XXXinvalidXXX" 102 with socket_helper.transient_internet(URL): 103 with self.assertWarns(DeprecationWarning): 104 open_url = urllib.request.FancyURLopener().open(URL) 105 try: 106 code = open_url.getcode() 107 finally: 108 open_url.close() 109 self.assertEqual(code, 404) 110 111 def test_bad_address(self): 112 # Make sure proper exception is raised when connecting to a bogus 113 # address. 114 115 # Given that both VeriSign and various ISPs have in 116 # the past or are presently hijacking various invalid 117 # domain name requests in an attempt to boost traffic 118 # to their own sites, finding a domain name to use 119 # for this test is difficult. RFC2606 leads one to 120 # believe that '.invalid' should work, but experience 121 # seemed to indicate otherwise. Single character 122 # TLDs are likely to remain invalid, so this seems to 123 # be the best choice. The trailing '.' prevents a 124 # related problem: The normal DNS resolver appends 125 # the domain names from the search path if there is 126 # no '.' the end and, and if one of those domains 127 # implements a '*' rule a result is returned. 128 # However, none of this will prevent the test from 129 # failing if the ISP hijacks all invalid domain 130 # requests. The real solution would be to be able to 131 # parameterize the framework with a mock resolver. 132 bogus_domain = "sadflkjsasf.i.nvali.d." 133 try: 134 socket.gethostbyname(bogus_domain) 135 except OSError: 136 # socket.gaierror is too narrow, since getaddrinfo() may also 137 # fail with EAI_SYSTEM and ETIMEDOUT (seen on Ubuntu 13.04), 138 # i.e. Python's TimeoutError. 139 pass 140 else: 141 # This happens with some overzealous DNS providers such as OpenDNS 142 self.skipTest("%r should not resolve for test to work" % bogus_domain) 143 failure_explanation = ('opening an invalid URL did not raise OSError; ' 144 'can be caused by a broken DNS server ' 145 '(e.g. returns 404 or hijacks page)') 146 with self.assertRaises(OSError, msg=failure_explanation): 147 urllib.request.urlopen("http://{}/".format(bogus_domain)) 148 149 150class urlretrieveNetworkTests(unittest.TestCase): 151 """Tests urllib.request.urlretrieve using the network.""" 152 153 def setUp(self): 154 # remove temporary files created by urlretrieve() 155 self.addCleanup(urllib.request.urlcleanup) 156 157 @contextlib.contextmanager 158 def urlretrieve(self, *args, **kwargs): 159 resource = args[0] 160 with socket_helper.transient_internet(resource): 161 file_location, info = urllib.request.urlretrieve(*args, **kwargs) 162 try: 163 yield file_location, info 164 finally: 165 support.unlink(file_location) 166 167 def test_basic(self): 168 # Test basic functionality. 169 with self.urlretrieve(self.logo) as (file_location, info): 170 self.assertTrue(os.path.exists(file_location), "file location returned by" 171 " urlretrieve is not a valid path") 172 with open(file_location, 'rb') as f: 173 self.assertTrue(f.read(), "reading from the file location returned" 174 " by urlretrieve failed") 175 176 def test_specified_path(self): 177 # Make sure that specifying the location of the file to write to works. 178 with self.urlretrieve(self.logo, 179 support.TESTFN) as (file_location, info): 180 self.assertEqual(file_location, support.TESTFN) 181 self.assertTrue(os.path.exists(file_location)) 182 with open(file_location, 'rb') as f: 183 self.assertTrue(f.read(), "reading from temporary file failed") 184 185 def test_header(self): 186 # Make sure header returned as 2nd value from urlretrieve is good. 187 with self.urlretrieve(self.logo) as (file_location, info): 188 self.assertIsInstance(info, email.message.Message, 189 "info is not an instance of email.message.Message") 190 191 logo = "http://www.pythontest.net/" 192 193 def test_data_header(self): 194 with self.urlretrieve(self.logo) as (file_location, fileheaders): 195 datevalue = fileheaders.get('Date') 196 dateformat = '%a, %d %b %Y %H:%M:%S GMT' 197 try: 198 time.strptime(datevalue, dateformat) 199 except ValueError: 200 self.fail('Date value not in %r format' % dateformat) 201 202 def test_reporthook(self): 203 records = [] 204 205 def recording_reporthook(blocks, block_size, total_size): 206 records.append((blocks, block_size, total_size)) 207 208 with self.urlretrieve(self.logo, reporthook=recording_reporthook) as ( 209 file_location, fileheaders): 210 expected_size = int(fileheaders['Content-Length']) 211 212 records_repr = repr(records) # For use in error messages. 213 self.assertGreater(len(records), 1, msg="There should always be two " 214 "calls; the first one before the transfer starts.") 215 self.assertEqual(records[0][0], 0) 216 self.assertGreater(records[0][1], 0, 217 msg="block size can't be 0 in %s" % records_repr) 218 self.assertEqual(records[0][2], expected_size) 219 self.assertEqual(records[-1][2], expected_size) 220 221 block_sizes = {block_size for _, block_size, _ in records} 222 self.assertEqual({records[0][1]}, block_sizes, 223 msg="block sizes in %s must be equal" % records_repr) 224 self.assertGreaterEqual(records[-1][0]*records[0][1], expected_size, 225 msg="number of blocks * block size must be" 226 " >= total size in %s" % records_repr) 227 228 229if __name__ == "__main__": 230 unittest.main() 231