• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import os
2import base64
3import urlparse
4import urllib2
5import BaseHTTPServer
6import unittest
7import hashlib
8
9from test import test_support
10
11mimetools = test_support.import_module('mimetools', deprecated=True)
12threading = test_support.import_module('threading')
13
14try:
15    import ssl
16except ImportError:
17    ssl = None
18
19here = os.path.dirname(__file__)
20# Self-signed cert file for 'localhost'
21CERT_localhost = os.path.join(here, 'keycert.pem')
22# Self-signed cert file for 'fakehostname'
23CERT_fakehostname = os.path.join(here, 'keycert2.pem')
24
25# Loopback http server infrastructure
26
27class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
28    """HTTP server w/ a few modifications that make it useful for
29    loopback testing purposes.
30    """
31
32    def __init__(self, server_address, RequestHandlerClass):
33        BaseHTTPServer.HTTPServer.__init__(self,
34                                           server_address,
35                                           RequestHandlerClass)
36
37        # Set the timeout of our listening socket really low so
38        # that we can stop the server easily.
39        self.socket.settimeout(0.1)
40
41    def get_request(self):
42        """BaseHTTPServer method, overridden."""
43
44        request, client_address = self.socket.accept()
45
46        # It's a loopback connection, so setting the timeout
47        # really low shouldn't affect anything, but should make
48        # deadlocks less likely to occur.
49        request.settimeout(10.0)
50
51        return (request, client_address)
52
53class LoopbackHttpServerThread(threading.Thread):
54    """Stoppable thread that runs a loopback http server."""
55
56    def __init__(self, request_handler):
57        threading.Thread.__init__(self)
58        self._stop = False
59        self.ready = threading.Event()
60        request_handler.protocol_version = "HTTP/1.0"
61        self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
62                                        request_handler)
63        #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
64        #                                      self.httpd.server_port)
65        self.port = self.httpd.server_port
66
67    def stop(self):
68        """Stops the webserver if it's currently running."""
69
70        # Set the stop flag.
71        self._stop = True
72
73        self.join()
74
75    def run(self):
76        self.ready.set()
77        while not self._stop:
78            self.httpd.handle_request()
79
80# Authentication infrastructure
81
82
83class BasicAuthHandler(BaseHTTPServer.BaseHTTPRequestHandler):
84    """Handler for performing Basic Authentication."""
85    # Server side values
86    USER = "testUser"
87    PASSWD = "testPass"
88    REALM = "Test"
89    USER_PASSWD = "%s:%s" % (USER, PASSWD)
90    ENCODED_AUTH = base64.b64encode(USER_PASSWD)
91
92    def __init__(self, *args, **kwargs):
93        BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
94
95    def log_message(self, format, *args):
96        # Suppress the HTTP Console log output
97        pass
98
99    def do_HEAD(self):
100        self.send_response(200)
101        self.send_header("Content-type", "text/html")
102        self.end_headers()
103
104    def do_AUTHHEAD(self):
105        self.send_response(401)
106        self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM)
107        self.send_header("Content-type", "text/html")
108        self.end_headers()
109
110    def do_GET(self):
111        if self.headers.getheader("Authorization") == None:
112            self.do_AUTHHEAD()
113            self.wfile.write("No Auth Header Received")
114        elif self.headers.getheader(
115                "Authorization") == "Basic " + self.ENCODED_AUTH:
116            self.wfile.write("It works!")
117        else:
118            # Unauthorized Request
119            self.do_AUTHHEAD()
120
121
122class DigestAuthHandler:
123    """Handler for performing digest authentication."""
124
125    def __init__(self):
126        self._request_num = 0
127        self._nonces = []
128        self._users = {}
129        self._realm_name = "Test Realm"
130        self._qop = "auth"
131
132    def set_qop(self, qop):
133        self._qop = qop
134
135    def set_users(self, users):
136        assert isinstance(users, dict)
137        self._users = users
138
139    def set_realm(self, realm):
140        self._realm_name = realm
141
142    def _generate_nonce(self):
143        self._request_num += 1
144        nonce = hashlib.md5(str(self._request_num)).hexdigest()
145        self._nonces.append(nonce)
146        return nonce
147
148    def _create_auth_dict(self, auth_str):
149        first_space_index = auth_str.find(" ")
150        auth_str = auth_str[first_space_index+1:]
151
152        parts = auth_str.split(",")
153
154        auth_dict = {}
155        for part in parts:
156            name, value = part.split("=")
157            name = name.strip()
158            if value[0] == '"' and value[-1] == '"':
159                value = value[1:-1]
160            else:
161                value = value.strip()
162            auth_dict[name] = value
163        return auth_dict
164
165    def _validate_auth(self, auth_dict, password, method, uri):
166        final_dict = {}
167        final_dict.update(auth_dict)
168        final_dict["password"] = password
169        final_dict["method"] = method
170        final_dict["uri"] = uri
171        HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
172        HA1 = hashlib.md5(HA1_str).hexdigest()
173        HA2_str = "%(method)s:%(uri)s" % final_dict
174        HA2 = hashlib.md5(HA2_str).hexdigest()
175        final_dict["HA1"] = HA1
176        final_dict["HA2"] = HA2
177        response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
178                       "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
179        response = hashlib.md5(response_str).hexdigest()
180
181        return response == auth_dict["response"]
182
183    def _return_auth_challenge(self, request_handler):
184        request_handler.send_response(407, "Proxy Authentication Required")
185        request_handler.send_header("Content-Type", "text/html")
186        request_handler.send_header(
187            'Proxy-Authenticate', 'Digest realm="%s", '
188            'qop="%s",'
189            'nonce="%s", ' % \
190            (self._realm_name, self._qop, self._generate_nonce()))
191        # XXX: Not sure if we're supposed to add this next header or
192        # not.
193        #request_handler.send_header('Connection', 'close')
194        request_handler.end_headers()
195        request_handler.wfile.write("Proxy Authentication Required.")
196        return False
197
198    def handle_request(self, request_handler):
199        """Performs digest authentication on the given HTTP request
200        handler.  Returns True if authentication was successful, False
201        otherwise.
202
203        If no users have been set, then digest auth is effectively
204        disabled and this method will always return True.
205        """
206
207        if len(self._users) == 0:
208            return True
209
210        if 'Proxy-Authorization' not in request_handler.headers:
211            return self._return_auth_challenge(request_handler)
212        else:
213            auth_dict = self._create_auth_dict(
214                request_handler.headers['Proxy-Authorization']
215                )
216            if auth_dict["username"] in self._users:
217                password = self._users[ auth_dict["username"] ]
218            else:
219                return self._return_auth_challenge(request_handler)
220            if not auth_dict.get("nonce") in self._nonces:
221                return self._return_auth_challenge(request_handler)
222            else:
223                self._nonces.remove(auth_dict["nonce"])
224
225            auth_validated = False
226
227            # MSIE uses short_path in its validation, but Python's
228            # urllib2 uses the full path, so we're going to see if
229            # either of them works here.
230
231            for path in [request_handler.path, request_handler.short_path]:
232                if self._validate_auth(auth_dict,
233                                       password,
234                                       request_handler.command,
235                                       path):
236                    auth_validated = True
237
238            if not auth_validated:
239                return self._return_auth_challenge(request_handler)
240            return True
241
242# Proxy test infrastructure
243
244class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
245    """This is a 'fake proxy' that makes it look like the entire
246    internet has gone down due to a sudden zombie invasion.  It main
247    utility is in providing us with authentication support for
248    testing.
249    """
250
251    def __init__(self, digest_auth_handler, *args, **kwargs):
252        # This has to be set before calling our parent's __init__(), which will
253        # try to call do_GET().
254        self.digest_auth_handler = digest_auth_handler
255        BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
256
257    def log_message(self, format, *args):
258        # Uncomment the next line for debugging.
259        #sys.stderr.write(format % args)
260        pass
261
262    def do_GET(self):
263        (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
264            self.path, 'http')
265        self.short_path = path
266        if self.digest_auth_handler.handle_request(self):
267            self.send_response(200, "OK")
268            self.send_header("Content-Type", "text/html")
269            self.end_headers()
270            self.wfile.write("You've reached %s!<BR>" % self.path)
271            self.wfile.write("Our apologies, but our server is down due to "
272                              "a sudden zombie invasion.")
273
274# Test cases
275
276class BaseTestCase(unittest.TestCase):
277    def setUp(self):
278        self._threads = test_support.threading_setup()
279
280    def tearDown(self):
281        self.doCleanups()
282        test_support.threading_cleanup(*self._threads)
283
284
285class BasicAuthTests(BaseTestCase):
286    USER = "testUser"
287    PASSWD = "testPass"
288    INCORRECT_PASSWD = "Incorrect"
289    REALM = "Test"
290
291    def setUp(self):
292        super(BasicAuthTests, self).setUp()
293        # With Basic Authentication
294        def http_server_with_basic_auth_handler(*args, **kwargs):
295            return BasicAuthHandler(*args, **kwargs)
296        self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler)
297        self.server_url = 'http://127.0.0.1:%s' % self.server.port
298        self.server.start()
299        self.server.ready.wait()
300        self.addCleanup(self.server.stop)
301
302    def test_basic_auth_success(self):
303        ah = urllib2.HTTPBasicAuthHandler()
304        ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
305        urllib2.install_opener(urllib2.build_opener(ah))
306        try:
307            self.assertTrue(urllib2.urlopen(self.server_url))
308        except urllib2.HTTPError:
309            self.fail("Basic Auth Failed for url: %s" % self.server_url)
310        except Exception as e:
311            raise e
312
313    def test_basic_auth_httperror(self):
314        ah = urllib2.HTTPBasicAuthHandler()
315        ah.add_password(self.REALM, self.server_url, self.USER,
316                        self.INCORRECT_PASSWD)
317        urllib2.install_opener(urllib2.build_opener(ah))
318        self.assertRaises(urllib2.HTTPError, urllib2.urlopen, self.server_url)
319
320
321class ProxyAuthTests(BaseTestCase):
322    URL = "http://localhost"
323
324    USER = "tester"
325    PASSWD = "test123"
326    REALM = "TestRealm"
327
328    def setUp(self):
329        super(ProxyAuthTests, self).setUp()
330        # Ignore proxy bypass settings in the environment.
331        def restore_environ(old_environ):
332            os.environ.clear()
333            os.environ.update(old_environ)
334        self.addCleanup(restore_environ, os.environ.copy())
335        os.environ['NO_PROXY'] = ''
336        os.environ['no_proxy'] = ''
337
338        self.digest_auth_handler = DigestAuthHandler()
339        self.digest_auth_handler.set_users({self.USER: self.PASSWD})
340        self.digest_auth_handler.set_realm(self.REALM)
341        # With Digest Authentication
342        def create_fake_proxy_handler(*args, **kwargs):
343            return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
344
345        self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
346        self.server.start()
347        self.server.ready.wait()
348        self.addCleanup(self.server.stop)
349        proxy_url = "http://127.0.0.1:%d" % self.server.port
350        handler = urllib2.ProxyHandler({"http" : proxy_url})
351        self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()
352        self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
353
354    def test_proxy_with_bad_password_raises_httperror(self):
355        self.proxy_digest_handler.add_password(self.REALM, self.URL,
356                                               self.USER, self.PASSWD+"bad")
357        self.digest_auth_handler.set_qop("auth")
358        self.assertRaises(urllib2.HTTPError,
359                          self.opener.open,
360                          self.URL)
361
362    def test_proxy_with_no_password_raises_httperror(self):
363        self.digest_auth_handler.set_qop("auth")
364        self.assertRaises(urllib2.HTTPError,
365                          self.opener.open,
366                          self.URL)
367
368    def test_proxy_qop_auth_works(self):
369        self.proxy_digest_handler.add_password(self.REALM, self.URL,
370                                               self.USER, self.PASSWD)
371        self.digest_auth_handler.set_qop("auth")
372        result = self.opener.open(self.URL)
373        while result.read():
374            pass
375        result.close()
376
377    def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
378        self.proxy_digest_handler.add_password(self.REALM, self.URL,
379                                               self.USER, self.PASSWD)
380        self.digest_auth_handler.set_qop("auth-int")
381        try:
382            result = self.opener.open(self.URL)
383        except urllib2.URLError:
384            # It's okay if we don't support auth-int, but we certainly
385            # shouldn't receive any kind of exception here other than
386            # a URLError.
387            result = None
388        if result:
389            while result.read():
390                pass
391            result.close()
392
393
394def GetRequestHandler(responses):
395
396    class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
397
398        server_version = "TestHTTP/"
399        requests = []
400        headers_received = []
401        port = 80
402
403        def do_GET(self):
404            body = self.send_head()
405            if body:
406                self.wfile.write(body)
407
408        def do_POST(self):
409            content_length = self.headers['Content-Length']
410            post_data = self.rfile.read(int(content_length))
411            self.do_GET()
412            self.requests.append(post_data)
413
414        def send_head(self):
415            FakeHTTPRequestHandler.headers_received = self.headers
416            self.requests.append(self.path)
417            response_code, headers, body = responses.pop(0)
418
419            self.send_response(response_code)
420
421            for (header, value) in headers:
422                self.send_header(header, value % self.port)
423            if body:
424                self.send_header('Content-type', 'text/plain')
425                self.end_headers()
426                return body
427            self.end_headers()
428
429        def log_message(self, *args):
430            pass
431
432
433    return FakeHTTPRequestHandler
434
435
436class TestUrlopen(BaseTestCase):
437    """Tests urllib2.urlopen using the network.
438
439    These tests are not exhaustive.  Assuming that testing using files does a
440    good job overall of some of the basic interface features.  There are no
441    tests exercising the optional 'data' and 'proxies' arguments.  No tests
442    for transparent redirection have been written.
443    """
444
445    def setUp(self):
446        proxy_handler = urllib2.ProxyHandler({})
447        opener = urllib2.build_opener(proxy_handler)
448        urllib2.install_opener(opener)
449        super(TestUrlopen, self).setUp()
450
451    def urlopen(self, url, data=None, **kwargs):
452        l = []
453        f = urllib2.urlopen(url, data, **kwargs)
454        try:
455            # Exercise various methods
456            l.extend(f.readlines(200))
457            l.append(f.readline())
458            l.append(f.read(1024))
459            l.append(f.read())
460        finally:
461            f.close()
462        return b"".join(l)
463
464    def start_server(self, responses):
465        handler = GetRequestHandler(responses)
466
467        self.server = LoopbackHttpServerThread(handler)
468        self.server.start()
469        self.server.ready.wait()
470        self.addCleanup(self.server.stop)
471        port = self.server.port
472        handler.port = port
473        return handler
474
475    def start_https_server(self, responses=None, **kwargs):
476        if not hasattr(urllib2, 'HTTPSHandler'):
477            self.skipTest('ssl support required')
478        from test.ssl_servers import make_https_server
479        if responses is None:
480            responses = [(200, [], b"we care a bit")]
481        handler = GetRequestHandler(responses)
482        server = make_https_server(self, handler_class=handler, **kwargs)
483        handler.port = server.port
484        return handler
485
486    def test_redirection(self):
487        expected_response = 'We got here...'
488        responses = [
489            (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),
490            (200, [], expected_response)
491        ]
492
493        handler = self.start_server(responses)
494
495        f = urllib2.urlopen('http://localhost:%s/' % handler.port)
496        data = f.read()
497        f.close()
498
499        self.assertEqual(data, expected_response)
500        self.assertEqual(handler.requests, ['/', '/somewhere_else'])
501
502
503    def test_404(self):
504        expected_response = 'Bad bad bad...'
505        handler = self.start_server([(404, [], expected_response)])
506
507        try:
508            urllib2.urlopen('http://localhost:%s/weeble' % handler.port)
509        except urllib2.URLError, f:
510            pass
511        else:
512            self.fail('404 should raise URLError')
513
514        data = f.read()
515        f.close()
516
517        self.assertEqual(data, expected_response)
518        self.assertEqual(handler.requests, ['/weeble'])
519
520
521    def test_200(self):
522        expected_response = 'pycon 2008...'
523        handler = self.start_server([(200, [], expected_response)])
524
525        f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port)
526        data = f.read()
527        f.close()
528
529        self.assertEqual(data, expected_response)
530        self.assertEqual(handler.requests, ['/bizarre'])
531
532    def test_200_with_parameters(self):
533        expected_response = 'pycon 2008...'
534        handler = self.start_server([(200, [], expected_response)])
535
536        f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling')
537        data = f.read()
538        f.close()
539
540        self.assertEqual(data, expected_response)
541        self.assertEqual(handler.requests, ['/bizarre', 'get=with_feeling'])
542
543    def test_https(self):
544        handler = self.start_https_server()
545        context = ssl.create_default_context(cafile=CERT_localhost)
546        data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context)
547        self.assertEqual(data, b"we care a bit")
548
549    def test_https_with_cafile(self):
550        handler = self.start_https_server(certfile=CERT_localhost)
551        # Good cert
552        data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
553                            cafile=CERT_localhost)
554        self.assertEqual(data, b"we care a bit")
555        # Bad cert
556        with self.assertRaises(urllib2.URLError):
557            self.urlopen("https://localhost:%s/bizarre" % handler.port,
558                         cafile=CERT_fakehostname)
559        # Good cert, but mismatching hostname
560        handler = self.start_https_server(certfile=CERT_fakehostname)
561        with self.assertRaises(ssl.CertificateError):
562            self.urlopen("https://localhost:%s/bizarre" % handler.port,
563                         cafile=CERT_fakehostname)
564
565    def test_https_with_cadefault(self):
566        handler = self.start_https_server(certfile=CERT_localhost)
567        # Self-signed cert should fail verification with system certificate store
568        with self.assertRaises(urllib2.URLError):
569            self.urlopen("https://localhost:%s/bizarre" % handler.port,
570                         cadefault=True)
571
572    def test_https_sni(self):
573        if ssl is None:
574            self.skipTest("ssl module required")
575        if not ssl.HAS_SNI:
576            self.skipTest("SNI support required in OpenSSL")
577        sni_name = [None]
578        def cb_sni(ssl_sock, server_name, initial_context):
579            sni_name[0] = server_name
580        context = ssl.SSLContext(ssl.PROTOCOL_TLS)
581        context.set_servername_callback(cb_sni)
582        handler = self.start_https_server(context=context, certfile=CERT_localhost)
583        context = ssl.create_default_context(cafile=CERT_localhost)
584        self.urlopen("https://localhost:%s" % handler.port, context=context)
585        self.assertEqual(sni_name[0], "localhost")
586
587    def test_sending_headers(self):
588        handler = self.start_server([(200, [], "we don't care")])
589
590        req = urllib2.Request("http://localhost:%s/" % handler.port,
591                              headers={'Range': 'bytes=20-39'})
592        urllib2.urlopen(req)
593        self.assertEqual(handler.headers_received['Range'], 'bytes=20-39')
594
595    def test_basic(self):
596        handler = self.start_server([(200, [], "we don't care")])
597
598        open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
599        for attr in ("read", "close", "info", "geturl"):
600            self.assertTrue(hasattr(open_url, attr), "object returned from "
601                         "urlopen lacks the %s attribute" % attr)
602        try:
603            self.assertTrue(open_url.read(), "calling 'read' failed")
604        finally:
605            open_url.close()
606
607    def test_info(self):
608        handler = self.start_server([(200, [], "we don't care")])
609
610        open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
611        info_obj = open_url.info()
612        self.assertIsInstance(info_obj, mimetools.Message,
613                              "object returned by 'info' is not an "
614                              "instance of mimetools.Message")
615        self.assertEqual(info_obj.getsubtype(), "plain")
616
617    def test_geturl(self):
618        # Make sure same URL as opened is returned by geturl.
619        handler = self.start_server([(200, [], "we don't care")])
620
621        open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
622        url = open_url.geturl()
623        self.assertEqual(url, "http://localhost:%s" % handler.port)
624
625
626    def test_bad_address(self):
627        # Make sure proper exception is raised when connecting to a bogus
628        # address.
629
630        # as indicated by the comment below, this might fail with some ISP,
631        # so we run the test only when -unetwork/-uall is specified to
632        # mitigate the problem a bit (see #17564)
633        test_support.requires('network')
634        self.assertRaises(IOError,
635                          # Given that both VeriSign and various ISPs have in
636                          # the past or are presently hijacking various invalid
637                          # domain name requests in an attempt to boost traffic
638                          # to their own sites, finding a domain name to use
639                          # for this test is difficult.  RFC2606 leads one to
640                          # believe that '.invalid' should work, but experience
641                          # seemed to indicate otherwise.  Single character
642                          # TLDs are likely to remain invalid, so this seems to
643                          # be the best choice. The trailing '.' prevents a
644                          # related problem: The normal DNS resolver appends
645                          # the domain names from the search path if there is
646                          # no '.' the end and, and if one of those domains
647                          # implements a '*' rule a result is returned.
648                          # However, none of this will prevent the test from
649                          # failing if the ISP hijacks all invalid domain
650                          # requests.  The real solution would be to be able to
651                          # parameterize the framework with a mock resolver.
652                          urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./")
653
654    def test_iteration(self):
655        expected_response = "pycon 2008..."
656        handler = self.start_server([(200, [], expected_response)])
657
658        data = urllib2.urlopen("http://localhost:%s" % handler.port)
659        for line in data:
660            self.assertEqual(line, expected_response)
661
662    def ztest_line_iteration(self):
663        lines = ["We\n", "got\n", "here\n", "verylong " * 8192 + "\n"]
664        expected_response = "".join(lines)
665        handler = self.start_server([(200, [], expected_response)])
666        data = urllib2.urlopen("http://localhost:%s" % handler.port)
667        for index, line in enumerate(data):
668            self.assertEqual(line, lines[index],
669                             "Fetched line number %s doesn't match expected:\n"
670                             "    Expected length was %s, got %s" %
671                             (index, len(lines[index]), len(line)))
672        self.assertEqual(index + 1, len(lines))
673
674def test_main():
675    # We will NOT depend on the network resource flag
676    # (Lib/test/regrtest.py -u network) since all tests here are only
677    # localhost.  However, if this is a bad rationale, then uncomment
678    # the next line.
679    #test_support.requires("network")
680
681    test_support.run_unittest(BasicAuthTests, ProxyAuthTests, TestUrlopen)
682
683if __name__ == "__main__":
684    test_main()
685