1import errno 2import unittest 3from test import support 4from test.support import os_helper 5from test.support import socket_helper 6from test.support import ResourceDenied 7from test.test_urllib2 import sanepathname2url 8 9import os 10import socket 11import urllib.error 12import urllib.request 13import sys 14 15support.requires("network") 16 17 18def _retry_thrice(func, exc, *args, **kwargs): 19 for i in range(3): 20 try: 21 return func(*args, **kwargs) 22 except exc as e: 23 last_exc = e 24 continue 25 raise last_exc 26 27def _wrap_with_retry_thrice(func, exc): 28 def wrapped(*args, **kwargs): 29 return _retry_thrice(func, exc, *args, **kwargs) 30 return wrapped 31 32# Connecting to remote hosts is flaky. Make it more robust by retrying 33# the connection several times. 34_urlopen_with_retry = _wrap_with_retry_thrice(urllib.request.urlopen, 35 urllib.error.URLError) 36 37 38class TransientResource(object): 39 40 """Raise ResourceDenied if an exception is raised while the context manager 41 is in effect that matches the specified exception and attributes.""" 42 43 def __init__(self, exc, **kwargs): 44 self.exc = exc 45 self.attrs = kwargs 46 47 def __enter__(self): 48 return self 49 50 def __exit__(self, type_=None, value=None, traceback=None): 51 """If type_ is a subclass of self.exc and value has attributes matching 52 self.attrs, raise ResourceDenied. Otherwise let the exception 53 propagate (if any).""" 54 if type_ is not None and issubclass(self.exc, type_): 55 for attr, attr_value in self.attrs.items(): 56 if not hasattr(value, attr): 57 break 58 if getattr(value, attr) != attr_value: 59 break 60 else: 61 raise ResourceDenied("an optional resource is not available") 62 63# Context managers that raise ResourceDenied when various issues 64# with the internet connection manifest themselves as exceptions. 65# XXX deprecate these and use transient_internet() instead 66time_out = TransientResource(OSError, errno=errno.ETIMEDOUT) 67socket_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) 68ioerror_peer_reset = TransientResource(OSError, errno=errno.ECONNRESET) 69 70 71class AuthTests(unittest.TestCase): 72 """Tests urllib2 authentication features.""" 73 74## Disabled at the moment since there is no page under python.org which 75## could be used to HTTP authentication. 76# 77# def test_basic_auth(self): 78# import http.client 79# 80# test_url = "http://www.python.org/test/test_urllib2/basic_auth" 81# test_hostport = "www.python.org" 82# test_realm = 'Test Realm' 83# test_user = 'test.test_urllib2net' 84# test_password = 'blah' 85# 86# # failure 87# try: 88# _urlopen_with_retry(test_url) 89# except urllib2.HTTPError, exc: 90# self.assertEqual(exc.code, 401) 91# else: 92# self.fail("urlopen() should have failed with 401") 93# 94# # success 95# auth_handler = urllib2.HTTPBasicAuthHandler() 96# auth_handler.add_password(test_realm, test_hostport, 97# test_user, test_password) 98# opener = urllib2.build_opener(auth_handler) 99# f = opener.open('http://localhost/') 100# response = _urlopen_with_retry("http://www.python.org/") 101# 102# # The 'userinfo' URL component is deprecated by RFC 3986 for security 103# # reasons, let's not implement it! (it's already implemented for proxy 104# # specification strings (that is, URLs or authorities specifying a 105# # proxy), so we must keep that) 106# self.assertRaises(http.client.InvalidURL, 107# urllib2.urlopen, "http://evil:thing@example.com") 108 109 110class CloseSocketTest(unittest.TestCase): 111 112 def test_close(self): 113 # clear _opener global variable 114 self.addCleanup(urllib.request.urlcleanup) 115 116 # calling .close() on urllib2's response objects should close the 117 # underlying socket 118 url = support.TEST_HTTP_URL 119 with socket_helper.transient_internet(url): 120 response = _urlopen_with_retry(url) 121 sock = response.fp 122 self.assertFalse(sock.closed) 123 response.close() 124 self.assertTrue(sock.closed) 125 126class OtherNetworkTests(unittest.TestCase): 127 def setUp(self): 128 if 0: # for debugging 129 import logging 130 logger = logging.getLogger("test_urllib2net") 131 logger.addHandler(logging.StreamHandler()) 132 133 # XXX The rest of these tests aren't very good -- they don't check much. 134 # They do sometimes catch some major disasters, though. 135 136 @support.requires_resource('walltime') 137 def test_ftp(self): 138 # Testing the same URL twice exercises the caching in CacheFTPHandler 139 urls = [ 140 'ftp://www.pythontest.net/README', 141 'ftp://www.pythontest.net/README', 142 ('ftp://www.pythontest.net/non-existent-file', 143 None, urllib.error.URLError), 144 ] 145 self._test_urls(urls, self._extra_handlers()) 146 147 def test_file(self): 148 TESTFN = os_helper.TESTFN 149 f = open(TESTFN, 'w') 150 try: 151 f.write('hi there\n') 152 f.close() 153 urls = [ 154 'file:' + sanepathname2url(os.path.abspath(TESTFN)), 155 ('file:///nonsensename/etc/passwd', None, 156 urllib.error.URLError), 157 ] 158 self._test_urls(urls, self._extra_handlers(), retry=True) 159 finally: 160 os.remove(TESTFN) 161 162 self.assertRaises(ValueError, urllib.request.urlopen,'./relative_path/to/file') 163 164 # XXX Following test depends on machine configurations that are internal 165 # to CNRI. Need to set up a public server with the right authentication 166 # configuration for test purposes. 167 168## def test_cnri(self): 169## if socket.gethostname() == 'bitdiddle': 170## localhost = 'bitdiddle.cnri.reston.va.us' 171## elif socket.gethostname() == 'bitdiddle.concentric.net': 172## localhost = 'localhost' 173## else: 174## localhost = None 175## if localhost is not None: 176## urls = [ 177## 'file://%s/etc/passwd' % localhost, 178## 'http://%s/simple/' % localhost, 179## 'http://%s/digest/' % localhost, 180## 'http://%s/not/found.h' % localhost, 181## ] 182 183## bauth = HTTPBasicAuthHandler() 184## bauth.add_password('basic_test_realm', localhost, 'jhylton', 185## 'password') 186## dauth = HTTPDigestAuthHandler() 187## dauth.add_password('digest_test_realm', localhost, 'jhylton', 188## 'password') 189 190## self._test_urls(urls, self._extra_handlers()+[bauth, dauth]) 191 192 def test_urlwithfrag(self): 193 urlwith_frag = "http://www.pythontest.net/index.html#frag" 194 with socket_helper.transient_internet(urlwith_frag): 195 req = urllib.request.Request(urlwith_frag) 196 res = urllib.request.urlopen(req) 197 self.assertEqual(res.geturl(), 198 "http://www.pythontest.net/index.html#frag") 199 200 @support.requires_resource('walltime') 201 def test_redirect_url_withfrag(self): 202 redirect_url_with_frag = "http://www.pythontest.net/redir/with_frag/" 203 with socket_helper.transient_internet(redirect_url_with_frag): 204 req = urllib.request.Request(redirect_url_with_frag) 205 res = urllib.request.urlopen(req) 206 self.assertEqual(res.geturl(), 207 "http://www.pythontest.net/elsewhere/#frag") 208 209 def test_custom_headers(self): 210 url = support.TEST_HTTP_URL 211 with socket_helper.transient_internet(url): 212 opener = urllib.request.build_opener() 213 request = urllib.request.Request(url) 214 self.assertFalse(request.header_items()) 215 opener.open(request) 216 self.assertTrue(request.header_items()) 217 self.assertTrue(request.has_header('User-agent')) 218 request.add_header('User-Agent','Test-Agent') 219 opener.open(request) 220 self.assertEqual(request.get_header('User-agent'),'Test-Agent') 221 222 @unittest.skip('XXX: http://www.imdb.com is gone') 223 def test_sites_no_connection_close(self): 224 # Some sites do not send Connection: close header. 225 # Verify that those work properly. (#issue12576) 226 227 URL = 'http://www.imdb.com' # mangles Connection:close 228 229 with socket_helper.transient_internet(URL): 230 try: 231 with urllib.request.urlopen(URL) as res: 232 pass 233 except ValueError: 234 self.fail("urlopen failed for site not sending \ 235 Connection:close") 236 else: 237 self.assertTrue(res) 238 239 req = urllib.request.urlopen(URL) 240 res = req.read() 241 self.assertTrue(res) 242 243 def _test_urls(self, urls, handlers, retry=True): 244 import time 245 import logging 246 debug = logging.getLogger("test_urllib2").debug 247 248 urlopen = urllib.request.build_opener(*handlers).open 249 if retry: 250 urlopen = _wrap_with_retry_thrice(urlopen, urllib.error.URLError) 251 252 for url in urls: 253 with self.subTest(url=url): 254 if isinstance(url, tuple): 255 url, req, expected_err = url 256 else: 257 req = expected_err = None 258 259 with socket_helper.transient_internet(url): 260 try: 261 f = urlopen(url, req, support.INTERNET_TIMEOUT) 262 # urllib.error.URLError is a subclass of OSError 263 except OSError as err: 264 if expected_err: 265 msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" % 266 (expected_err, url, req, type(err), err)) 267 self.assertIsInstance(err, expected_err, msg) 268 else: 269 raise 270 else: 271 try: 272 with time_out, \ 273 socket_peer_reset, \ 274 ioerror_peer_reset: 275 buf = f.read() 276 debug("read %d bytes" % len(buf)) 277 except TimeoutError: 278 print("<timeout: %s>" % url, file=sys.stderr) 279 f.close() 280 time.sleep(0.1) 281 282 def _extra_handlers(self): 283 handlers = [] 284 285 cfh = urllib.request.CacheFTPHandler() 286 self.addCleanup(cfh.clear_cache) 287 cfh.setTimeout(1) 288 handlers.append(cfh) 289 290 return handlers 291 292 293class TimeoutTest(unittest.TestCase): 294 def setUp(self): 295 # clear _opener global variable 296 self.addCleanup(urllib.request.urlcleanup) 297 298 def test_http_basic(self): 299 self.assertIsNone(socket.getdefaulttimeout()) 300 url = support.TEST_HTTP_URL 301 with socket_helper.transient_internet(url, timeout=None): 302 u = _urlopen_with_retry(url) 303 self.addCleanup(u.close) 304 self.assertIsNone(u.fp.raw._sock.gettimeout()) 305 306 def test_http_default_timeout(self): 307 self.assertIsNone(socket.getdefaulttimeout()) 308 url = support.TEST_HTTP_URL 309 with socket_helper.transient_internet(url): 310 socket.setdefaulttimeout(60) 311 try: 312 u = _urlopen_with_retry(url) 313 self.addCleanup(u.close) 314 finally: 315 socket.setdefaulttimeout(None) 316 self.assertEqual(u.fp.raw._sock.gettimeout(), 60) 317 318 def test_http_no_timeout(self): 319 self.assertIsNone(socket.getdefaulttimeout()) 320 url = support.TEST_HTTP_URL 321 with socket_helper.transient_internet(url): 322 socket.setdefaulttimeout(60) 323 try: 324 u = _urlopen_with_retry(url, timeout=None) 325 self.addCleanup(u.close) 326 finally: 327 socket.setdefaulttimeout(None) 328 self.assertIsNone(u.fp.raw._sock.gettimeout()) 329 330 def test_http_timeout(self): 331 url = support.TEST_HTTP_URL 332 with socket_helper.transient_internet(url): 333 u = _urlopen_with_retry(url, timeout=120) 334 self.addCleanup(u.close) 335 self.assertEqual(u.fp.raw._sock.gettimeout(), 120) 336 337 FTP_HOST = 'ftp://www.pythontest.net/' 338 339 @support.requires_resource('walltime') 340 def test_ftp_basic(self): 341 self.assertIsNone(socket.getdefaulttimeout()) 342 with socket_helper.transient_internet(self.FTP_HOST, timeout=None): 343 u = _urlopen_with_retry(self.FTP_HOST) 344 self.addCleanup(u.close) 345 self.assertIsNone(u.fp.fp.raw._sock.gettimeout()) 346 347 def test_ftp_default_timeout(self): 348 self.assertIsNone(socket.getdefaulttimeout()) 349 with socket_helper.transient_internet(self.FTP_HOST): 350 socket.setdefaulttimeout(60) 351 try: 352 u = _urlopen_with_retry(self.FTP_HOST) 353 self.addCleanup(u.close) 354 finally: 355 socket.setdefaulttimeout(None) 356 self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60) 357 358 @support.requires_resource('walltime') 359 def test_ftp_no_timeout(self): 360 self.assertIsNone(socket.getdefaulttimeout()) 361 with socket_helper.transient_internet(self.FTP_HOST): 362 socket.setdefaulttimeout(60) 363 try: 364 u = _urlopen_with_retry(self.FTP_HOST, timeout=None) 365 self.addCleanup(u.close) 366 finally: 367 socket.setdefaulttimeout(None) 368 self.assertIsNone(u.fp.fp.raw._sock.gettimeout()) 369 370 @support.requires_resource('walltime') 371 def test_ftp_timeout(self): 372 with socket_helper.transient_internet(self.FTP_HOST): 373 u = _urlopen_with_retry(self.FTP_HOST, timeout=60) 374 self.addCleanup(u.close) 375 self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60) 376 377 378if __name__ == "__main__": 379 unittest.main() 380