1import unittest 2from test import support 3from test.test_urllib2 import sanepathname2url 4 5import os 6import socket 7import urllib.error 8import urllib.request 9import sys 10 11support.requires("network") 12 13TIMEOUT = 60 # seconds 14 15 16def _retry_thrice(func, exc, *args, **kwargs): 17 for i in range(3): 18 try: 19 return func(*args, **kwargs) 20 except exc as e: 21 last_exc = e 22 continue 23 raise last_exc 24 25def _wrap_with_retry_thrice(func, exc): 26 def wrapped(*args, **kwargs): 27 return _retry_thrice(func, exc, *args, **kwargs) 28 return wrapped 29 30# bpo-35411: FTP tests of test_urllib2net randomly fail 31# with "425 Security: Bad IP connecting" on Travis CI 32skip_ftp_test_on_travis = unittest.skipIf('TRAVIS' in os.environ, 33 'bpo-35411: skip FTP test ' 34 'on Travis CI') 35 36 37# Connecting to remote hosts is flaky. Make it more robust by retrying 38# the connection several times. 39_urlopen_with_retry = _wrap_with_retry_thrice(urllib.request.urlopen, 40 urllib.error.URLError) 41 42 43class AuthTests(unittest.TestCase): 44 """Tests urllib2 authentication features.""" 45 46## Disabled at the moment since there is no page under python.org which 47## could be used to HTTP authentication. 48# 49# def test_basic_auth(self): 50# import http.client 51# 52# test_url = "http://www.python.org/test/test_urllib2/basic_auth" 53# test_hostport = "www.python.org" 54# test_realm = 'Test Realm' 55# test_user = 'test.test_urllib2net' 56# test_password = 'blah' 57# 58# # failure 59# try: 60# _urlopen_with_retry(test_url) 61# except urllib2.HTTPError, exc: 62# self.assertEqual(exc.code, 401) 63# else: 64# self.fail("urlopen() should have failed with 401") 65# 66# # success 67# auth_handler = urllib2.HTTPBasicAuthHandler() 68# auth_handler.add_password(test_realm, test_hostport, 69# test_user, test_password) 70# opener = urllib2.build_opener(auth_handler) 71# f = opener.open('http://localhost/') 72# response = _urlopen_with_retry("http://www.python.org/") 73# 74# # The 'userinfo' URL component is deprecated by RFC 3986 for security 75# # reasons, let's not implement it! (it's already implemented for proxy 76# # specification strings (that is, URLs or authorities specifying a 77# # proxy), so we must keep that) 78# self.assertRaises(http.client.InvalidURL, 79# urllib2.urlopen, "http://evil:thing@example.com") 80 81 82class CloseSocketTest(unittest.TestCase): 83 84 def test_close(self): 85 # calling .close() on urllib2's response objects should close the 86 # underlying socket 87 url = support.TEST_HTTP_URL 88 with support.transient_internet(url): 89 response = _urlopen_with_retry(url) 90 sock = response.fp 91 self.assertFalse(sock.closed) 92 response.close() 93 self.assertTrue(sock.closed) 94 95class OtherNetworkTests(unittest.TestCase): 96 def setUp(self): 97 if 0: # for debugging 98 import logging 99 logger = logging.getLogger("test_urllib2net") 100 logger.addHandler(logging.StreamHandler()) 101 102 # XXX The rest of these tests aren't very good -- they don't check much. 103 # They do sometimes catch some major disasters, though. 104 105 @skip_ftp_test_on_travis 106 def test_ftp(self): 107 urls = [ 108 'ftp://www.pythontest.net/README', 109 ('ftp://www.pythontest.net/non-existent-file', 110 None, urllib.error.URLError), 111 ] 112 self._test_urls(urls, self._extra_handlers()) 113 114 def test_file(self): 115 TESTFN = support.TESTFN 116 f = open(TESTFN, 'w') 117 try: 118 f.write('hi there\n') 119 f.close() 120 urls = [ 121 'file:' + sanepathname2url(os.path.abspath(TESTFN)), 122 ('file:///nonsensename/etc/passwd', None, 123 urllib.error.URLError), 124 ] 125 self._test_urls(urls, self._extra_handlers(), retry=True) 126 finally: 127 os.remove(TESTFN) 128 129 self.assertRaises(ValueError, urllib.request.urlopen,'./relative_path/to/file') 130 131 # XXX Following test depends on machine configurations that are internal 132 # to CNRI. Need to set up a public server with the right authentication 133 # configuration for test purposes. 134 135## def test_cnri(self): 136## if socket.gethostname() == 'bitdiddle': 137## localhost = 'bitdiddle.cnri.reston.va.us' 138## elif socket.gethostname() == 'bitdiddle.concentric.net': 139## localhost = 'localhost' 140## else: 141## localhost = None 142## if localhost is not None: 143## urls = [ 144## 'file://%s/etc/passwd' % localhost, 145## 'http://%s/simple/' % localhost, 146## 'http://%s/digest/' % localhost, 147## 'http://%s/not/found.h' % localhost, 148## ] 149 150## bauth = HTTPBasicAuthHandler() 151## bauth.add_password('basic_test_realm', localhost, 'jhylton', 152## 'password') 153## dauth = HTTPDigestAuthHandler() 154## dauth.add_password('digest_test_realm', localhost, 'jhylton', 155## 'password') 156 157## self._test_urls(urls, self._extra_handlers()+[bauth, dauth]) 158 159 def test_urlwithfrag(self): 160 urlwith_frag = "http://www.pythontest.net/index.html#frag" 161 with support.transient_internet(urlwith_frag): 162 req = urllib.request.Request(urlwith_frag) 163 res = urllib.request.urlopen(req) 164 self.assertEqual(res.geturl(), 165 "http://www.pythontest.net/index.html#frag") 166 167 def test_redirect_url_withfrag(self): 168 redirect_url_with_frag = "http://www.pythontest.net/redir/with_frag/" 169 with support.transient_internet(redirect_url_with_frag): 170 req = urllib.request.Request(redirect_url_with_frag) 171 res = urllib.request.urlopen(req) 172 self.assertEqual(res.geturl(), 173 "http://www.pythontest.net/elsewhere/#frag") 174 175 def test_custom_headers(self): 176 url = support.TEST_HTTP_URL 177 with support.transient_internet(url): 178 opener = urllib.request.build_opener() 179 request = urllib.request.Request(url) 180 self.assertFalse(request.header_items()) 181 opener.open(request) 182 self.assertTrue(request.header_items()) 183 self.assertTrue(request.has_header('User-agent')) 184 request.add_header('User-Agent','Test-Agent') 185 opener.open(request) 186 self.assertEqual(request.get_header('User-agent'),'Test-Agent') 187 188 @unittest.skip('XXX: http://www.imdb.com is gone') 189 def test_sites_no_connection_close(self): 190 # Some sites do not send Connection: close header. 191 # Verify that those work properly. (#issue12576) 192 193 URL = 'http://www.imdb.com' # mangles Connection:close 194 195 with support.transient_internet(URL): 196 try: 197 with urllib.request.urlopen(URL) as res: 198 pass 199 except ValueError as e: 200 self.fail("urlopen failed for site not sending \ 201 Connection:close") 202 else: 203 self.assertTrue(res) 204 205 req = urllib.request.urlopen(URL) 206 res = req.read() 207 self.assertTrue(res) 208 209 def _test_urls(self, urls, handlers, retry=True): 210 import time 211 import logging 212 debug = logging.getLogger("test_urllib2").debug 213 214 urlopen = urllib.request.build_opener(*handlers).open 215 if retry: 216 urlopen = _wrap_with_retry_thrice(urlopen, urllib.error.URLError) 217 218 for url in urls: 219 with self.subTest(url=url): 220 if isinstance(url, tuple): 221 url, req, expected_err = url 222 else: 223 req = expected_err = None 224 225 with support.transient_internet(url): 226 try: 227 f = urlopen(url, req, TIMEOUT) 228 # urllib.error.URLError is a subclass of OSError 229 except OSError as err: 230 if expected_err: 231 msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" % 232 (expected_err, url, req, type(err), err)) 233 self.assertIsInstance(err, expected_err, msg) 234 else: 235 raise 236 else: 237 try: 238 with support.time_out, \ 239 support.socket_peer_reset, \ 240 support.ioerror_peer_reset: 241 buf = f.read() 242 debug("read %d bytes" % len(buf)) 243 except socket.timeout: 244 print("<timeout: %s>" % url, file=sys.stderr) 245 f.close() 246 time.sleep(0.1) 247 248 def _extra_handlers(self): 249 handlers = [] 250 251 cfh = urllib.request.CacheFTPHandler() 252 self.addCleanup(cfh.clear_cache) 253 cfh.setTimeout(1) 254 handlers.append(cfh) 255 256 return handlers 257 258 259class TimeoutTest(unittest.TestCase): 260 def test_http_basic(self): 261 self.assertIsNone(socket.getdefaulttimeout()) 262 url = support.TEST_HTTP_URL 263 with support.transient_internet(url, timeout=None): 264 u = _urlopen_with_retry(url) 265 self.addCleanup(u.close) 266 self.assertIsNone(u.fp.raw._sock.gettimeout()) 267 268 def test_http_default_timeout(self): 269 self.assertIsNone(socket.getdefaulttimeout()) 270 url = support.TEST_HTTP_URL 271 with support.transient_internet(url): 272 socket.setdefaulttimeout(60) 273 try: 274 u = _urlopen_with_retry(url) 275 self.addCleanup(u.close) 276 finally: 277 socket.setdefaulttimeout(None) 278 self.assertEqual(u.fp.raw._sock.gettimeout(), 60) 279 280 def test_http_no_timeout(self): 281 self.assertIsNone(socket.getdefaulttimeout()) 282 url = support.TEST_HTTP_URL 283 with support.transient_internet(url): 284 socket.setdefaulttimeout(60) 285 try: 286 u = _urlopen_with_retry(url, timeout=None) 287 self.addCleanup(u.close) 288 finally: 289 socket.setdefaulttimeout(None) 290 self.assertIsNone(u.fp.raw._sock.gettimeout()) 291 292 def test_http_timeout(self): 293 url = support.TEST_HTTP_URL 294 with support.transient_internet(url): 295 u = _urlopen_with_retry(url, timeout=120) 296 self.addCleanup(u.close) 297 self.assertEqual(u.fp.raw._sock.gettimeout(), 120) 298 299 FTP_HOST = 'ftp://www.pythontest.net/' 300 301 @skip_ftp_test_on_travis 302 def test_ftp_basic(self): 303 self.assertIsNone(socket.getdefaulttimeout()) 304 with support.transient_internet(self.FTP_HOST, timeout=None): 305 u = _urlopen_with_retry(self.FTP_HOST) 306 self.addCleanup(u.close) 307 self.assertIsNone(u.fp.fp.raw._sock.gettimeout()) 308 309 @skip_ftp_test_on_travis 310 def test_ftp_default_timeout(self): 311 self.assertIsNone(socket.getdefaulttimeout()) 312 with support.transient_internet(self.FTP_HOST): 313 socket.setdefaulttimeout(60) 314 try: 315 u = _urlopen_with_retry(self.FTP_HOST) 316 self.addCleanup(u.close) 317 finally: 318 socket.setdefaulttimeout(None) 319 self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60) 320 321 @skip_ftp_test_on_travis 322 def test_ftp_no_timeout(self): 323 self.assertIsNone(socket.getdefaulttimeout()) 324 with support.transient_internet(self.FTP_HOST): 325 socket.setdefaulttimeout(60) 326 try: 327 u = _urlopen_with_retry(self.FTP_HOST, timeout=None) 328 self.addCleanup(u.close) 329 finally: 330 socket.setdefaulttimeout(None) 331 self.assertIsNone(u.fp.fp.raw._sock.gettimeout()) 332 333 @skip_ftp_test_on_travis 334 def test_ftp_timeout(self): 335 with support.transient_internet(self.FTP_HOST): 336 u = _urlopen_with_retry(self.FTP_HOST, timeout=60) 337 self.addCleanup(u.close) 338 self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60) 339 340 341if __name__ == "__main__": 342 unittest.main() 343