1import unittest 2from test import test_support 3from test.test_urllib2 import sanepathname2url 4 5import socket 6import urllib2 7import os 8import sys 9 10TIMEOUT = 60 # seconds 11 12 13def _retry_thrice(func, exc, *args, **kwargs): 14 for i in range(3): 15 try: 16 return func(*args, **kwargs) 17 except exc, last_exc: 18 continue 19 except: 20 raise 21 raise last_exc 22 23def _wrap_with_retry_thrice(func, exc): 24 def wrapped(*args, **kwargs): 25 return _retry_thrice(func, exc, *args, **kwargs) 26 return wrapped 27 28# Connecting to remote hosts is flaky. Make it more robust by retrying 29# the connection several times. 30_urlopen_with_retry = _wrap_with_retry_thrice(urllib2.urlopen, urllib2.URLError) 31 32 33class AuthTests(unittest.TestCase): 34 """Tests urllib2 authentication features.""" 35 36## Disabled at the moment since there is no page under python.org which 37## could be used to HTTP authentication. 38# 39# def test_basic_auth(self): 40# import httplib 41# 42# test_url = "http://www.python.org/test/test_urllib2/basic_auth" 43# test_hostport = "www.python.org" 44# test_realm = 'Test Realm' 45# test_user = 'test.test_urllib2net' 46# test_password = 'blah' 47# 48# # failure 49# try: 50# _urlopen_with_retry(test_url) 51# except urllib2.HTTPError, exc: 52# self.assertEqual(exc.code, 401) 53# else: 54# self.fail("urlopen() should have failed with 401") 55# 56# # success 57# auth_handler = urllib2.HTTPBasicAuthHandler() 58# auth_handler.add_password(test_realm, test_hostport, 59# test_user, test_password) 60# opener = urllib2.build_opener(auth_handler) 61# f = opener.open('http://localhost/') 62# response = _urlopen_with_retry("http://www.python.org/") 63# 64# # The 'userinfo' URL component is deprecated by RFC 3986 for security 65# # reasons, let's not implement it! (it's already implemented for proxy 66# # specification strings (that is, URLs or authorities specifying a 67# # proxy), so we must keep that) 68# self.assertRaises(httplib.InvalidURL, 69# urllib2.urlopen, "http://evil:thing@example.com") 70 71 72class CloseSocketTest(unittest.TestCase): 73 74 def test_close(self): 75 import httplib 76 77 # calling .close() on urllib2's response objects should close the 78 # underlying socket 79 80 # delve deep into response to fetch socket._socketobject 81 response = _urlopen_with_retry("http://www.example.com/") 82 abused_fileobject = response.fp 83 self.assertIs(abused_fileobject.__class__, socket._fileobject) 84 httpresponse = abused_fileobject._sock 85 self.assertIs(httpresponse.__class__, httplib.HTTPResponse) 86 fileobject = httpresponse.fp 87 self.assertIs(fileobject.__class__, socket._fileobject) 88 89 self.assertTrue(not fileobject.closed) 90 response.close() 91 self.assertTrue(fileobject.closed) 92 93class OtherNetworkTests(unittest.TestCase): 94 def setUp(self): 95 if 0: # for debugging 96 import logging 97 logger = logging.getLogger("test_urllib2net") 98 logger.addHandler(logging.StreamHandler()) 99 100 # XXX The rest of these tests aren't very good -- they don't check much. 101 # They do sometimes catch some major disasters, though. 102 103 def test_ftp(self): 104 urls = [ 105 'ftp://www.pythontest.net/README', 106 ('ftp://www.pythontest.net/non-existent-file', 107 None, urllib2.URLError), 108 ] 109 self._test_urls(urls, self._extra_handlers()) 110 111 def test_file(self): 112 TESTFN = test_support.TESTFN 113 f = open(TESTFN, 'w') 114 try: 115 f.write('hi there\n') 116 f.close() 117 urls = [ 118 'file:'+sanepathname2url(os.path.abspath(TESTFN)), 119 ('file:///nonsensename/etc/passwd', None, urllib2.URLError), 120 ] 121 self._test_urls(urls, self._extra_handlers(), retry=True) 122 finally: 123 os.remove(TESTFN) 124 125 self.assertRaises(ValueError, urllib2.urlopen,'./relative_path/to/file') 126 127 # XXX Following test depends on machine configurations that are internal 128 # to CNRI. Need to set up a public server with the right authentication 129 # configuration for test purposes. 130 131## def test_cnri(self): 132## if socket.gethostname() == 'bitdiddle': 133## localhost = 'bitdiddle.cnri.reston.va.us' 134## elif socket.gethostname() == 'bitdiddle.concentric.net': 135## localhost = 'localhost' 136## else: 137## localhost = None 138## if localhost is not None: 139## urls = [ 140## 'file://%s/etc/passwd' % localhost, 141## 'http://%s/simple/' % localhost, 142## 'http://%s/digest/' % localhost, 143## 'http://%s/not/found.h' % localhost, 144## ] 145 146## bauth = HTTPBasicAuthHandler() 147## bauth.add_password('basic_test_realm', localhost, 'jhylton', 148## 'password') 149## dauth = HTTPDigestAuthHandler() 150## dauth.add_password('digest_test_realm', localhost, 'jhylton', 151## 'password') 152 153## self._test_urls(urls, self._extra_handlers()+[bauth, dauth]) 154 155 def test_urlwithfrag(self): 156 urlwith_frag = "http://www.pythontest.net/index.html#frag" 157 with test_support.transient_internet(urlwith_frag): 158 req = urllib2.Request(urlwith_frag) 159 res = urllib2.urlopen(req) 160 self.assertEqual(res.geturl(), 161 "http://www.pythontest.net/index.html#frag") 162 163 def test_fileno(self): 164 req = urllib2.Request("http://www.example.com") 165 opener = urllib2.build_opener() 166 res = opener.open(req) 167 try: 168 res.fileno() 169 except AttributeError: 170 self.fail("HTTPResponse object should return a valid fileno") 171 finally: 172 res.close() 173 174 def test_custom_headers(self): 175 url = "http://www.example.com" 176 with test_support.transient_internet(url): 177 opener = urllib2.build_opener() 178 request = urllib2.Request(url) 179 self.assertFalse(request.header_items()) 180 opener.open(request) 181 self.assertTrue(request.header_items()) 182 self.assertTrue(request.has_header('User-agent')) 183 request.add_header('User-Agent','Test-Agent') 184 opener.open(request) 185 self.assertEqual(request.get_header('User-agent'),'Test-Agent') 186 187 @unittest.skip('XXX: http://www.imdb.com is gone') 188 def test_sites_no_connection_close(self): 189 # Some sites do not send Connection: close header. 190 # Verify that those work properly. (#issue12576) 191 192 URL = 'http://www.imdb.com' # No Connection:close 193 with test_support.transient_internet(URL): 194 req = urllib2.urlopen(URL) 195 res = req.read() 196 self.assertTrue(res) 197 198 def _test_urls(self, urls, handlers, retry=True): 199 import time 200 import logging 201 debug = logging.getLogger("test_urllib2").debug 202 203 urlopen = urllib2.build_opener(*handlers).open 204 if retry: 205 urlopen = _wrap_with_retry_thrice(urlopen, urllib2.URLError) 206 207 for url in urls: 208 if isinstance(url, tuple): 209 url, req, expected_err = url 210 else: 211 req = expected_err = None 212 with test_support.transient_internet(url): 213 debug(url) 214 try: 215 f = urlopen(url, req, TIMEOUT) 216 except EnvironmentError as err: 217 debug(err) 218 if expected_err: 219 msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" % 220 (expected_err, url, req, type(err), err)) 221 self.assertIsInstance(err, expected_err, msg) 222 except urllib2.URLError as err: 223 if isinstance(err[0], socket.timeout): 224 print >>sys.stderr, "<timeout: %s>" % url 225 continue 226 else: 227 raise 228 else: 229 try: 230 with test_support.transient_internet(url): 231 buf = f.read() 232 debug("read %d bytes" % len(buf)) 233 except socket.timeout: 234 print >>sys.stderr, "<timeout: %s>" % url 235 f.close() 236 debug("******** next url coming up...") 237 time.sleep(0.1) 238 239 def _extra_handlers(self): 240 handlers = [] 241 242 cfh = urllib2.CacheFTPHandler() 243 self.addCleanup(cfh.clear_cache) 244 cfh.setTimeout(1) 245 handlers.append(cfh) 246 247 return handlers 248 249 250class TimeoutTest(unittest.TestCase): 251 def test_http_basic(self): 252 self.assertIsNone(socket.getdefaulttimeout()) 253 url = "http://www.example.com" 254 with test_support.transient_internet(url, timeout=None): 255 u = _urlopen_with_retry(url) 256 self.assertIsNone(u.fp._sock.fp._sock.gettimeout()) 257 258 def test_http_default_timeout(self): 259 self.assertIsNone(socket.getdefaulttimeout()) 260 url = "http://www.example.com" 261 with test_support.transient_internet(url): 262 socket.setdefaulttimeout(60) 263 try: 264 u = _urlopen_with_retry(url) 265 finally: 266 socket.setdefaulttimeout(None) 267 self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60) 268 269 def test_http_no_timeout(self): 270 self.assertIsNone(socket.getdefaulttimeout()) 271 url = "http://www.example.com" 272 with test_support.transient_internet(url): 273 socket.setdefaulttimeout(60) 274 try: 275 u = _urlopen_with_retry(url, timeout=None) 276 finally: 277 socket.setdefaulttimeout(None) 278 self.assertIsNone(u.fp._sock.fp._sock.gettimeout()) 279 280 def test_http_timeout(self): 281 url = "http://www.example.com" 282 with test_support.transient_internet(url): 283 u = _urlopen_with_retry(url, timeout=120) 284 self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120) 285 286 FTP_HOST = 'ftp://www.pythontest.net/' 287 288 def test_ftp_basic(self): 289 self.assertIsNone(socket.getdefaulttimeout()) 290 with test_support.transient_internet(self.FTP_HOST, timeout=None): 291 u = _urlopen_with_retry(self.FTP_HOST) 292 self.assertIsNone(u.fp.fp._sock.gettimeout()) 293 294 def test_ftp_default_timeout(self): 295 self.assertIsNone(socket.getdefaulttimeout()) 296 with test_support.transient_internet(self.FTP_HOST): 297 socket.setdefaulttimeout(60) 298 try: 299 u = _urlopen_with_retry(self.FTP_HOST) 300 finally: 301 socket.setdefaulttimeout(None) 302 self.assertEqual(u.fp.fp._sock.gettimeout(), 60) 303 304 def test_ftp_no_timeout(self): 305 self.assertIsNone(socket.getdefaulttimeout(),) 306 with test_support.transient_internet(self.FTP_HOST): 307 socket.setdefaulttimeout(60) 308 try: 309 u = _urlopen_with_retry(self.FTP_HOST, timeout=None) 310 finally: 311 socket.setdefaulttimeout(None) 312 self.assertIsNone(u.fp.fp._sock.gettimeout()) 313 314 def test_ftp_timeout(self): 315 with test_support.transient_internet(self.FTP_HOST): 316 u = _urlopen_with_retry(self.FTP_HOST, timeout=60) 317 self.assertEqual(u.fp.fp._sock.gettimeout(), 60) 318 319 320def test_main(): 321 test_support.requires("network") 322 test_support.run_unittest(AuthTests, 323 OtherNetworkTests, 324 CloseSocketTest, 325 TimeoutTest, 326 ) 327 328if __name__ == "__main__": 329 test_main() 330