• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python
2
3import urlparse
4import urllib2
5import BaseHTTPServer
6import unittest
7import hashlib
8from test import test_support
9mimetools = test_support.import_module('mimetools', deprecated=True)
10threading = test_support.import_module('threading')
11
12# Loopback http server infrastructure
13
14class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
15    """HTTP server w/ a few modifications that make it useful for
16    loopback testing purposes.
17    """
18
19    def __init__(self, server_address, RequestHandlerClass):
20        BaseHTTPServer.HTTPServer.__init__(self,
21                                           server_address,
22                                           RequestHandlerClass)
23
24        # Set the timeout of our listening socket really low so
25        # that we can stop the server easily.
26        self.socket.settimeout(1.0)
27
28    def get_request(self):
29        """BaseHTTPServer method, overridden."""
30
31        request, client_address = self.socket.accept()
32
33        # It's a loopback connection, so setting the timeout
34        # really low shouldn't affect anything, but should make
35        # deadlocks less likely to occur.
36        request.settimeout(10.0)
37
38        return (request, client_address)
39
40class LoopbackHttpServerThread(threading.Thread):
41    """Stoppable thread that runs a loopback http server."""
42
43    def __init__(self, request_handler):
44        threading.Thread.__init__(self)
45        self._stop = False
46        self.ready = threading.Event()
47        request_handler.protocol_version = "HTTP/1.0"
48        self.httpd = LoopbackHttpServer(('127.0.0.1', 0),
49                                        request_handler)
50        #print "Serving HTTP on %s port %s" % (self.httpd.server_name,
51        #                                      self.httpd.server_port)
52        self.port = self.httpd.server_port
53
54    def stop(self):
55        """Stops the webserver if it's currently running."""
56
57        # Set the stop flag.
58        self._stop = True
59
60        self.join()
61
62    def run(self):
63        self.ready.set()
64        while not self._stop:
65            self.httpd.handle_request()
66
67# Authentication infrastructure
68
69class DigestAuthHandler:
70    """Handler for performing digest authentication."""
71
72    def __init__(self):
73        self._request_num = 0
74        self._nonces = []
75        self._users = {}
76        self._realm_name = "Test Realm"
77        self._qop = "auth"
78
79    def set_qop(self, qop):
80        self._qop = qop
81
82    def set_users(self, users):
83        assert isinstance(users, dict)
84        self._users = users
85
86    def set_realm(self, realm):
87        self._realm_name = realm
88
89    def _generate_nonce(self):
90        self._request_num += 1
91        nonce = hashlib.md5(str(self._request_num)).hexdigest()
92        self._nonces.append(nonce)
93        return nonce
94
95    def _create_auth_dict(self, auth_str):
96        first_space_index = auth_str.find(" ")
97        auth_str = auth_str[first_space_index+1:]
98
99        parts = auth_str.split(",")
100
101        auth_dict = {}
102        for part in parts:
103            name, value = part.split("=")
104            name = name.strip()
105            if value[0] == '"' and value[-1] == '"':
106                value = value[1:-1]
107            else:
108                value = value.strip()
109            auth_dict[name] = value
110        return auth_dict
111
112    def _validate_auth(self, auth_dict, password, method, uri):
113        final_dict = {}
114        final_dict.update(auth_dict)
115        final_dict["password"] = password
116        final_dict["method"] = method
117        final_dict["uri"] = uri
118        HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
119        HA1 = hashlib.md5(HA1_str).hexdigest()
120        HA2_str = "%(method)s:%(uri)s" % final_dict
121        HA2 = hashlib.md5(HA2_str).hexdigest()
122        final_dict["HA1"] = HA1
123        final_dict["HA2"] = HA2
124        response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
125                       "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
126        response = hashlib.md5(response_str).hexdigest()
127
128        return response == auth_dict["response"]
129
130    def _return_auth_challenge(self, request_handler):
131        request_handler.send_response(407, "Proxy Authentication Required")
132        request_handler.send_header("Content-Type", "text/html")
133        request_handler.send_header(
134            'Proxy-Authenticate', 'Digest realm="%s", '
135            'qop="%s",'
136            'nonce="%s", ' % \
137            (self._realm_name, self._qop, self._generate_nonce()))
138        # XXX: Not sure if we're supposed to add this next header or
139        # not.
140        #request_handler.send_header('Connection', 'close')
141        request_handler.end_headers()
142        request_handler.wfile.write("Proxy Authentication Required.")
143        return False
144
145    def handle_request(self, request_handler):
146        """Performs digest authentication on the given HTTP request
147        handler.  Returns True if authentication was successful, False
148        otherwise.
149
150        If no users have been set, then digest auth is effectively
151        disabled and this method will always return True.
152        """
153
154        if len(self._users) == 0:
155            return True
156
157        if 'Proxy-Authorization' not in request_handler.headers:
158            return self._return_auth_challenge(request_handler)
159        else:
160            auth_dict = self._create_auth_dict(
161                request_handler.headers['Proxy-Authorization']
162                )
163            if auth_dict["username"] in self._users:
164                password = self._users[ auth_dict["username"] ]
165            else:
166                return self._return_auth_challenge(request_handler)
167            if not auth_dict.get("nonce") in self._nonces:
168                return self._return_auth_challenge(request_handler)
169            else:
170                self._nonces.remove(auth_dict["nonce"])
171
172            auth_validated = False
173
174            # MSIE uses short_path in its validation, but Python's
175            # urllib2 uses the full path, so we're going to see if
176            # either of them works here.
177
178            for path in [request_handler.path, request_handler.short_path]:
179                if self._validate_auth(auth_dict,
180                                       password,
181                                       request_handler.command,
182                                       path):
183                    auth_validated = True
184
185            if not auth_validated:
186                return self._return_auth_challenge(request_handler)
187            return True
188
189# Proxy test infrastructure
190
191class FakeProxyHandler(BaseHTTPServer.BaseHTTPRequestHandler):
192    """This is a 'fake proxy' that makes it look like the entire
193    internet has gone down due to a sudden zombie invasion.  It main
194    utility is in providing us with authentication support for
195    testing.
196    """
197
198    def __init__(self, digest_auth_handler, *args, **kwargs):
199        # This has to be set before calling our parent's __init__(), which will
200        # try to call do_GET().
201        self.digest_auth_handler = digest_auth_handler
202        BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
203
204    def log_message(self, format, *args):
205        # Uncomment the next line for debugging.
206        #sys.stderr.write(format % args)
207        pass
208
209    def do_GET(self):
210        (scm, netloc, path, params, query, fragment) = urlparse.urlparse(
211            self.path, 'http')
212        self.short_path = path
213        if self.digest_auth_handler.handle_request(self):
214            self.send_response(200, "OK")
215            self.send_header("Content-Type", "text/html")
216            self.end_headers()
217            self.wfile.write("You've reached %s!<BR>" % self.path)
218            self.wfile.write("Our apologies, but our server is down due to "
219                              "a sudden zombie invasion.")
220
221# Test cases
222
223class BaseTestCase(unittest.TestCase):
224    def setUp(self):
225        self._threads = test_support.threading_setup()
226
227    def tearDown(self):
228        test_support.threading_cleanup(*self._threads)
229
230
231class ProxyAuthTests(BaseTestCase):
232    URL = "http://localhost"
233
234    USER = "tester"
235    PASSWD = "test123"
236    REALM = "TestRealm"
237
238    def setUp(self):
239        super(ProxyAuthTests, self).setUp()
240        self.digest_auth_handler = DigestAuthHandler()
241        self.digest_auth_handler.set_users({self.USER: self.PASSWD})
242        self.digest_auth_handler.set_realm(self.REALM)
243        def create_fake_proxy_handler(*args, **kwargs):
244            return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
245
246        self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
247        self.server.start()
248        self.server.ready.wait()
249        proxy_url = "http://127.0.0.1:%d" % self.server.port
250        handler = urllib2.ProxyHandler({"http" : proxy_url})
251        self.proxy_digest_handler = urllib2.ProxyDigestAuthHandler()
252        self.opener = urllib2.build_opener(handler, self.proxy_digest_handler)
253
254    def tearDown(self):
255        self.server.stop()
256        super(ProxyAuthTests, self).tearDown()
257
258    def test_proxy_with_bad_password_raises_httperror(self):
259        self.proxy_digest_handler.add_password(self.REALM, self.URL,
260                                               self.USER, self.PASSWD+"bad")
261        self.digest_auth_handler.set_qop("auth")
262        self.assertRaises(urllib2.HTTPError,
263                          self.opener.open,
264                          self.URL)
265
266    def test_proxy_with_no_password_raises_httperror(self):
267        self.digest_auth_handler.set_qop("auth")
268        self.assertRaises(urllib2.HTTPError,
269                          self.opener.open,
270                          self.URL)
271
272    def test_proxy_qop_auth_works(self):
273        self.proxy_digest_handler.add_password(self.REALM, self.URL,
274                                               self.USER, self.PASSWD)
275        self.digest_auth_handler.set_qop("auth")
276        result = self.opener.open(self.URL)
277        while result.read():
278            pass
279        result.close()
280
281    def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
282        self.proxy_digest_handler.add_password(self.REALM, self.URL,
283                                               self.USER, self.PASSWD)
284        self.digest_auth_handler.set_qop("auth-int")
285        try:
286            result = self.opener.open(self.URL)
287        except urllib2.URLError:
288            # It's okay if we don't support auth-int, but we certainly
289            # shouldn't receive any kind of exception here other than
290            # a URLError.
291            result = None
292        if result:
293            while result.read():
294                pass
295            result.close()
296
297
298def GetRequestHandler(responses):
299
300    class FakeHTTPRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
301
302        server_version = "TestHTTP/"
303        requests = []
304        headers_received = []
305        port = 80
306
307        def do_GET(self):
308            body = self.send_head()
309            if body:
310                self.wfile.write(body)
311
312        def do_POST(self):
313            content_length = self.headers['Content-Length']
314            post_data = self.rfile.read(int(content_length))
315            self.do_GET()
316            self.requests.append(post_data)
317
318        def send_head(self):
319            FakeHTTPRequestHandler.headers_received = self.headers
320            self.requests.append(self.path)
321            response_code, headers, body = responses.pop(0)
322
323            self.send_response(response_code)
324
325            for (header, value) in headers:
326                self.send_header(header, value % self.port)
327            if body:
328                self.send_header('Content-type', 'text/plain')
329                self.end_headers()
330                return body
331            self.end_headers()
332
333        def log_message(self, *args):
334            pass
335
336
337    return FakeHTTPRequestHandler
338
339
340class TestUrlopen(BaseTestCase):
341    """Tests urllib2.urlopen using the network.
342
343    These tests are not exhaustive.  Assuming that testing using files does a
344    good job overall of some of the basic interface features.  There are no
345    tests exercising the optional 'data' and 'proxies' arguments.  No tests
346    for transparent redirection have been written.
347    """
348
349    def start_server(self, responses):
350        handler = GetRequestHandler(responses)
351
352        self.server = LoopbackHttpServerThread(handler)
353        self.server.start()
354        self.server.ready.wait()
355        port = self.server.port
356        handler.port = port
357        return handler
358
359
360    def test_redirection(self):
361        expected_response = 'We got here...'
362        responses = [
363            (302, [('Location', 'http://localhost:%s/somewhere_else')], ''),
364            (200, [], expected_response)
365        ]
366
367        handler = self.start_server(responses)
368
369        try:
370            f = urllib2.urlopen('http://localhost:%s/' % handler.port)
371            data = f.read()
372            f.close()
373
374            self.assertEqual(data, expected_response)
375            self.assertEqual(handler.requests, ['/', '/somewhere_else'])
376        finally:
377            self.server.stop()
378
379
380    def test_404(self):
381        expected_response = 'Bad bad bad...'
382        handler = self.start_server([(404, [], expected_response)])
383
384        try:
385            try:
386                urllib2.urlopen('http://localhost:%s/weeble' % handler.port)
387            except urllib2.URLError, f:
388                pass
389            else:
390                self.fail('404 should raise URLError')
391
392            data = f.read()
393            f.close()
394
395            self.assertEqual(data, expected_response)
396            self.assertEqual(handler.requests, ['/weeble'])
397        finally:
398            self.server.stop()
399
400
401    def test_200(self):
402        expected_response = 'pycon 2008...'
403        handler = self.start_server([(200, [], expected_response)])
404
405        try:
406            f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port)
407            data = f.read()
408            f.close()
409
410            self.assertEqual(data, expected_response)
411            self.assertEqual(handler.requests, ['/bizarre'])
412        finally:
413            self.server.stop()
414
415    def test_200_with_parameters(self):
416        expected_response = 'pycon 2008...'
417        handler = self.start_server([(200, [], expected_response)])
418
419        try:
420            f = urllib2.urlopen('http://localhost:%s/bizarre' % handler.port, 'get=with_feeling')
421            data = f.read()
422            f.close()
423
424            self.assertEqual(data, expected_response)
425            self.assertEqual(handler.requests, ['/bizarre', 'get=with_feeling'])
426        finally:
427            self.server.stop()
428
429
430    def test_sending_headers(self):
431        handler = self.start_server([(200, [], "we don't care")])
432
433        try:
434            req = urllib2.Request("http://localhost:%s/" % handler.port,
435                                  headers={'Range': 'bytes=20-39'})
436            urllib2.urlopen(req)
437            self.assertEqual(handler.headers_received['Range'], 'bytes=20-39')
438        finally:
439            self.server.stop()
440
441    def test_basic(self):
442        handler = self.start_server([(200, [], "we don't care")])
443
444        try:
445            open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
446            for attr in ("read", "close", "info", "geturl"):
447                self.assertTrue(hasattr(open_url, attr), "object returned from "
448                             "urlopen lacks the %s attribute" % attr)
449            try:
450                self.assertTrue(open_url.read(), "calling 'read' failed")
451            finally:
452                open_url.close()
453        finally:
454            self.server.stop()
455
456    def test_info(self):
457        handler = self.start_server([(200, [], "we don't care")])
458
459        try:
460            open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
461            info_obj = open_url.info()
462            self.assertIsInstance(info_obj, mimetools.Message,
463                                  "object returned by 'info' is not an "
464                                  "instance of mimetools.Message")
465            self.assertEqual(info_obj.getsubtype(), "plain")
466        finally:
467            self.server.stop()
468
469    def test_geturl(self):
470        # Make sure same URL as opened is returned by geturl.
471        handler = self.start_server([(200, [], "we don't care")])
472
473        try:
474            open_url = urllib2.urlopen("http://localhost:%s" % handler.port)
475            url = open_url.geturl()
476            self.assertEqual(url, "http://localhost:%s" % handler.port)
477        finally:
478            self.server.stop()
479
480
481    def test_bad_address(self):
482        # Make sure proper exception is raised when connecting to a bogus
483        # address.
484        self.assertRaises(IOError,
485                          # Given that both VeriSign and various ISPs have in
486                          # the past or are presently hijacking various invalid
487                          # domain name requests in an attempt to boost traffic
488                          # to their own sites, finding a domain name to use
489                          # for this test is difficult.  RFC2606 leads one to
490                          # believe that '.invalid' should work, but experience
491                          # seemed to indicate otherwise.  Single character
492                          # TLDs are likely to remain invalid, so this seems to
493                          # be the best choice. The trailing '.' prevents a
494                          # related problem: The normal DNS resolver appends
495                          # the domain names from the search path if there is
496                          # no '.' the end and, and if one of those domains
497                          # implements a '*' rule a result is returned.
498                          # However, none of this will prevent the test from
499                          # failing if the ISP hijacks all invalid domain
500                          # requests.  The real solution would be to be able to
501                          # parameterize the framework with a mock resolver.
502                          urllib2.urlopen, "http://sadflkjsasf.i.nvali.d./")
503
504    def test_iteration(self):
505        expected_response = "pycon 2008..."
506        handler = self.start_server([(200, [], expected_response)])
507        try:
508            data = urllib2.urlopen("http://localhost:%s" % handler.port)
509            for line in data:
510                self.assertEqual(line, expected_response)
511        finally:
512            self.server.stop()
513
514    def ztest_line_iteration(self):
515        lines = ["We\n", "got\n", "here\n", "verylong " * 8192 + "\n"]
516        expected_response = "".join(lines)
517        handler = self.start_server([(200, [], expected_response)])
518        try:
519            data = urllib2.urlopen("http://localhost:%s" % handler.port)
520            for index, line in enumerate(data):
521                self.assertEqual(line, lines[index],
522                                 "Fetched line number %s doesn't match expected:\n"
523                                 "    Expected length was %s, got %s" %
524                                 (index, len(lines[index]), len(line)))
525        finally:
526            self.server.stop()
527        self.assertEqual(index + 1, len(lines))
528
529def test_main():
530    # We will NOT depend on the network resource flag
531    # (Lib/test/regrtest.py -u network) since all tests here are only
532    # localhost.  However, if this is a bad rationale, then uncomment
533    # the next line.
534    #test_support.requires("network")
535
536    test_support.run_unittest(ProxyAuthTests, TestUrlopen)
537
538if __name__ == "__main__":
539    test_main()
540