• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import base64
2import os
3import email
4import urllib.parse
5import urllib.request
6import http.server
7import threading
8import unittest
9import hashlib
10
11from test import support
12
13try:
14    import ssl
15except ImportError:
16    ssl = None
17
18here = os.path.dirname(__file__)
19# Self-signed cert file for 'localhost'
20CERT_localhost = os.path.join(here, 'keycert.pem')
21# Self-signed cert file for 'fakehostname'
22CERT_fakehostname = os.path.join(here, 'keycert2.pem')
23
24
25# Loopback http server infrastructure
26
27class LoopbackHttpServer(http.server.HTTPServer):
28    """HTTP server w/ a few modifications that make it useful for
29    loopback testing purposes.
30    """
31
32    def __init__(self, server_address, RequestHandlerClass):
33        http.server.HTTPServer.__init__(self,
34                                        server_address,
35                                        RequestHandlerClass)
36
37        # Set the timeout of our listening socket really low so
38        # that we can stop the server easily.
39        self.socket.settimeout(0.1)
40
41    def get_request(self):
42        """HTTPServer method, overridden."""
43
44        request, client_address = self.socket.accept()
45
46        # It's a loopback connection, so setting the timeout
47        # really low shouldn't affect anything, but should make
48        # deadlocks less likely to occur.
49        request.settimeout(10.0)
50
51        return (request, client_address)
52
53class LoopbackHttpServerThread(threading.Thread):
54    """Stoppable thread that runs a loopback http server."""
55
56    def __init__(self, request_handler):
57        threading.Thread.__init__(self)
58        self._stop_server = False
59        self.ready = threading.Event()
60        request_handler.protocol_version = "HTTP/1.0"
61        self.httpd = LoopbackHttpServer(("127.0.0.1", 0),
62                                        request_handler)
63        self.port = self.httpd.server_port
64
65    def stop(self):
66        """Stops the webserver if it's currently running."""
67
68        self._stop_server = True
69
70        self.join()
71        self.httpd.server_close()
72
73    def run(self):
74        self.ready.set()
75        while not self._stop_server:
76            self.httpd.handle_request()
77
78# Authentication infrastructure
79
80class DigestAuthHandler:
81    """Handler for performing digest authentication."""
82
83    def __init__(self):
84        self._request_num = 0
85        self._nonces = []
86        self._users = {}
87        self._realm_name = "Test Realm"
88        self._qop = "auth"
89
90    def set_qop(self, qop):
91        self._qop = qop
92
93    def set_users(self, users):
94        assert isinstance(users, dict)
95        self._users = users
96
97    def set_realm(self, realm):
98        self._realm_name = realm
99
100    def _generate_nonce(self):
101        self._request_num += 1
102        nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest()
103        self._nonces.append(nonce)
104        return nonce
105
106    def _create_auth_dict(self, auth_str):
107        first_space_index = auth_str.find(" ")
108        auth_str = auth_str[first_space_index+1:]
109
110        parts = auth_str.split(",")
111
112        auth_dict = {}
113        for part in parts:
114            name, value = part.split("=")
115            name = name.strip()
116            if value[0] == '"' and value[-1] == '"':
117                value = value[1:-1]
118            else:
119                value = value.strip()
120            auth_dict[name] = value
121        return auth_dict
122
123    def _validate_auth(self, auth_dict, password, method, uri):
124        final_dict = {}
125        final_dict.update(auth_dict)
126        final_dict["password"] = password
127        final_dict["method"] = method
128        final_dict["uri"] = uri
129        HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
130        HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest()
131        HA2_str = "%(method)s:%(uri)s" % final_dict
132        HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest()
133        final_dict["HA1"] = HA1
134        final_dict["HA2"] = HA2
135        response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
136                       "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
137        response = hashlib.md5(response_str.encode("ascii")).hexdigest()
138
139        return response == auth_dict["response"]
140
141    def _return_auth_challenge(self, request_handler):
142        request_handler.send_response(407, "Proxy Authentication Required")
143        request_handler.send_header("Content-Type", "text/html")
144        request_handler.send_header(
145            'Proxy-Authenticate', 'Digest realm="%s", '
146            'qop="%s",'
147            'nonce="%s", ' % \
148            (self._realm_name, self._qop, self._generate_nonce()))
149        # XXX: Not sure if we're supposed to add this next header or
150        # not.
151        #request_handler.send_header('Connection', 'close')
152        request_handler.end_headers()
153        request_handler.wfile.write(b"Proxy Authentication Required.")
154        return False
155
156    def handle_request(self, request_handler):
157        """Performs digest authentication on the given HTTP request
158        handler.  Returns True if authentication was successful, False
159        otherwise.
160
161        If no users have been set, then digest auth is effectively
162        disabled and this method will always return True.
163        """
164
165        if len(self._users) == 0:
166            return True
167
168        if "Proxy-Authorization" not in request_handler.headers:
169            return self._return_auth_challenge(request_handler)
170        else:
171            auth_dict = self._create_auth_dict(
172                request_handler.headers["Proxy-Authorization"]
173                )
174            if auth_dict["username"] in self._users:
175                password = self._users[ auth_dict["username"] ]
176            else:
177                return self._return_auth_challenge(request_handler)
178            if not auth_dict.get("nonce") in self._nonces:
179                return self._return_auth_challenge(request_handler)
180            else:
181                self._nonces.remove(auth_dict["nonce"])
182
183            auth_validated = False
184
185            # MSIE uses short_path in its validation, but Python's
186            # urllib.request uses the full path, so we're going to see if
187            # either of them works here.
188
189            for path in [request_handler.path, request_handler.short_path]:
190                if self._validate_auth(auth_dict,
191                                       password,
192                                       request_handler.command,
193                                       path):
194                    auth_validated = True
195
196            if not auth_validated:
197                return self._return_auth_challenge(request_handler)
198            return True
199
200
201class BasicAuthHandler(http.server.BaseHTTPRequestHandler):
202    """Handler for performing basic authentication."""
203    # Server side values
204    USER = 'testUser'
205    PASSWD = 'testPass'
206    REALM = 'Test'
207    USER_PASSWD = "%s:%s" % (USER, PASSWD)
208    ENCODED_AUTH = base64.b64encode(USER_PASSWD.encode('ascii')).decode('ascii')
209
210    def __init__(self, *args, **kwargs):
211        http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
212
213    def log_message(self, format, *args):
214        # Suppress console log message
215        pass
216
217    def do_HEAD(self):
218        self.send_response(200)
219        self.send_header("Content-type", "text/html")
220        self.end_headers()
221
222    def do_AUTHHEAD(self):
223        self.send_response(401)
224        self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM)
225        self.send_header("Content-type", "text/html")
226        self.end_headers()
227
228    def do_GET(self):
229        if not self.headers.get("Authorization", ""):
230            self.do_AUTHHEAD()
231            self.wfile.write(b"No Auth header received")
232        elif self.headers.get(
233                "Authorization", "") == "Basic " + self.ENCODED_AUTH:
234            self.send_response(200)
235            self.end_headers()
236            self.wfile.write(b"It works")
237        else:
238            # Request Unauthorized
239            self.do_AUTHHEAD()
240
241
242
243# Proxy test infrastructure
244
245class FakeProxyHandler(http.server.BaseHTTPRequestHandler):
246    """This is a 'fake proxy' that makes it look like the entire
247    internet has gone down due to a sudden zombie invasion.  It main
248    utility is in providing us with authentication support for
249    testing.
250    """
251
252    def __init__(self, digest_auth_handler, *args, **kwargs):
253        # This has to be set before calling our parent's __init__(), which will
254        # try to call do_GET().
255        self.digest_auth_handler = digest_auth_handler
256        http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
257
258    def log_message(self, format, *args):
259        # Uncomment the next line for debugging.
260        # sys.stderr.write(format % args)
261        pass
262
263    def do_GET(self):
264        (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse(
265            self.path, "http")
266        self.short_path = path
267        if self.digest_auth_handler.handle_request(self):
268            self.send_response(200, "OK")
269            self.send_header("Content-Type", "text/html")
270            self.end_headers()
271            self.wfile.write(bytes("You've reached %s!<BR>" % self.path,
272                                   "ascii"))
273            self.wfile.write(b"Our apologies, but our server is down due to "
274                             b"a sudden zombie invasion.")
275
276# Test cases
277
278class BasicAuthTests(unittest.TestCase):
279    USER = "testUser"
280    PASSWD = "testPass"
281    INCORRECT_PASSWD = "Incorrect"
282    REALM = "Test"
283
284    def setUp(self):
285        super(BasicAuthTests, self).setUp()
286        # With Basic Authentication
287        def http_server_with_basic_auth_handler(*args, **kwargs):
288            return BasicAuthHandler(*args, **kwargs)
289        self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler)
290        self.addCleanup(self.stop_server)
291        self.server_url = 'http://127.0.0.1:%s' % self.server.port
292        self.server.start()
293        self.server.ready.wait()
294
295    def stop_server(self):
296        self.server.stop()
297        self.server = None
298
299    def tearDown(self):
300        super(BasicAuthTests, self).tearDown()
301
302    def test_basic_auth_success(self):
303        ah = urllib.request.HTTPBasicAuthHandler()
304        ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
305        urllib.request.install_opener(urllib.request.build_opener(ah))
306        try:
307            self.assertTrue(urllib.request.urlopen(self.server_url))
308        except urllib.error.HTTPError:
309            self.fail("Basic auth failed for the url: %s" % self.server_url)
310
311    def test_basic_auth_httperror(self):
312        ah = urllib.request.HTTPBasicAuthHandler()
313        ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD)
314        urllib.request.install_opener(urllib.request.build_opener(ah))
315        self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url)
316
317
318class ProxyAuthTests(unittest.TestCase):
319    URL = "http://localhost"
320
321    USER = "tester"
322    PASSWD = "test123"
323    REALM = "TestRealm"
324
325    @support.requires_hashdigest("md5")
326    def setUp(self):
327        super(ProxyAuthTests, self).setUp()
328        # Ignore proxy bypass settings in the environment.
329        def restore_environ(old_environ):
330            os.environ.clear()
331            os.environ.update(old_environ)
332        self.addCleanup(restore_environ, os.environ.copy())
333        os.environ['NO_PROXY'] = ''
334        os.environ['no_proxy'] = ''
335
336        self.digest_auth_handler = DigestAuthHandler()
337        self.digest_auth_handler.set_users({self.USER: self.PASSWD})
338        self.digest_auth_handler.set_realm(self.REALM)
339        # With Digest Authentication.
340        def create_fake_proxy_handler(*args, **kwargs):
341            return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
342
343        self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
344        self.addCleanup(self.stop_server)
345        self.server.start()
346        self.server.ready.wait()
347        proxy_url = "http://127.0.0.1:%d" % self.server.port
348        handler = urllib.request.ProxyHandler({"http" : proxy_url})
349        self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler()
350        self.opener = urllib.request.build_opener(
351            handler, self.proxy_digest_handler)
352
353    def stop_server(self):
354        self.server.stop()
355        self.server = None
356
357    def test_proxy_with_bad_password_raises_httperror(self):
358        self.proxy_digest_handler.add_password(self.REALM, self.URL,
359                                               self.USER, self.PASSWD+"bad")
360        self.digest_auth_handler.set_qop("auth")
361        self.assertRaises(urllib.error.HTTPError,
362                          self.opener.open,
363                          self.URL)
364
365    def test_proxy_with_no_password_raises_httperror(self):
366        self.digest_auth_handler.set_qop("auth")
367        self.assertRaises(urllib.error.HTTPError,
368                          self.opener.open,
369                          self.URL)
370
371    def test_proxy_qop_auth_works(self):
372        self.proxy_digest_handler.add_password(self.REALM, self.URL,
373                                               self.USER, self.PASSWD)
374        self.digest_auth_handler.set_qop("auth")
375        with self.opener.open(self.URL) as result:
376            while result.read():
377                pass
378
379    def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
380        self.proxy_digest_handler.add_password(self.REALM, self.URL,
381                                               self.USER, self.PASSWD)
382        self.digest_auth_handler.set_qop("auth-int")
383        try:
384            result = self.opener.open(self.URL)
385        except urllib.error.URLError:
386            # It's okay if we don't support auth-int, but we certainly
387            # shouldn't receive any kind of exception here other than
388            # a URLError.
389            pass
390        else:
391            with result:
392                while result.read():
393                    pass
394
395
396def GetRequestHandler(responses):
397
398    class FakeHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
399
400        server_version = "TestHTTP/"
401        requests = []
402        headers_received = []
403        port = 80
404
405        def do_GET(self):
406            body = self.send_head()
407            while body:
408                done = self.wfile.write(body)
409                body = body[done:]
410
411        def do_POST(self):
412            content_length = self.headers["Content-Length"]
413            post_data = self.rfile.read(int(content_length))
414            self.do_GET()
415            self.requests.append(post_data)
416
417        def send_head(self):
418            FakeHTTPRequestHandler.headers_received = self.headers
419            self.requests.append(self.path)
420            response_code, headers, body = responses.pop(0)
421
422            self.send_response(response_code)
423
424            for (header, value) in headers:
425                self.send_header(header, value % {'port':self.port})
426            if body:
427                self.send_header("Content-type", "text/plain")
428                self.end_headers()
429                return body
430            self.end_headers()
431
432        def log_message(self, *args):
433            pass
434
435
436    return FakeHTTPRequestHandler
437
438
439class TestUrlopen(unittest.TestCase):
440    """Tests urllib.request.urlopen using the network.
441
442    These tests are not exhaustive.  Assuming that testing using files does a
443    good job overall of some of the basic interface features.  There are no
444    tests exercising the optional 'data' and 'proxies' arguments.  No tests
445    for transparent redirection have been written.
446    """
447
448    def setUp(self):
449        super(TestUrlopen, self).setUp()
450
451        # Ignore proxies for localhost tests.
452        def restore_environ(old_environ):
453            os.environ.clear()
454            os.environ.update(old_environ)
455        self.addCleanup(restore_environ, os.environ.copy())
456        os.environ['NO_PROXY'] = '*'
457        os.environ['no_proxy'] = '*'
458
459    def urlopen(self, url, data=None, **kwargs):
460        l = []
461        f = urllib.request.urlopen(url, data, **kwargs)
462        try:
463            # Exercise various methods
464            l.extend(f.readlines(200))
465            l.append(f.readline())
466            l.append(f.read(1024))
467            l.append(f.read())
468        finally:
469            f.close()
470        return b"".join(l)
471
472    def stop_server(self):
473        self.server.stop()
474        self.server = None
475
476    def start_server(self, responses=None):
477        if responses is None:
478            responses = [(200, [], b"we don't care")]
479        handler = GetRequestHandler(responses)
480
481        self.server = LoopbackHttpServerThread(handler)
482        self.addCleanup(self.stop_server)
483        self.server.start()
484        self.server.ready.wait()
485        port = self.server.port
486        handler.port = port
487        return handler
488
489    def start_https_server(self, responses=None, **kwargs):
490        if not hasattr(urllib.request, 'HTTPSHandler'):
491            self.skipTest('ssl support required')
492        from test.ssl_servers import make_https_server
493        if responses is None:
494            responses = [(200, [], b"we care a bit")]
495        handler = GetRequestHandler(responses)
496        server = make_https_server(self, handler_class=handler, **kwargs)
497        handler.port = server.port
498        return handler
499
500    def test_redirection(self):
501        expected_response = b"We got here..."
502        responses = [
503            (302, [("Location", "http://localhost:%(port)s/somewhere_else")],
504             ""),
505            (200, [], expected_response)
506        ]
507
508        handler = self.start_server(responses)
509        data = self.urlopen("http://localhost:%s/" % handler.port)
510        self.assertEqual(data, expected_response)
511        self.assertEqual(handler.requests, ["/", "/somewhere_else"])
512
513    def test_chunked(self):
514        expected_response = b"hello world"
515        chunked_start = (
516                        b'a\r\n'
517                        b'hello worl\r\n'
518                        b'1\r\n'
519                        b'd\r\n'
520                        b'0\r\n'
521                        )
522        response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)]
523        handler = self.start_server(response)
524        data = self.urlopen("http://localhost:%s/" % handler.port)
525        self.assertEqual(data, expected_response)
526
527    def test_404(self):
528        expected_response = b"Bad bad bad..."
529        handler = self.start_server([(404, [], expected_response)])
530
531        try:
532            self.urlopen("http://localhost:%s/weeble" % handler.port)
533        except urllib.error.URLError as f:
534            data = f.read()
535            f.close()
536        else:
537            self.fail("404 should raise URLError")
538
539        self.assertEqual(data, expected_response)
540        self.assertEqual(handler.requests, ["/weeble"])
541
542    def test_200(self):
543        expected_response = b"pycon 2008..."
544        handler = self.start_server([(200, [], expected_response)])
545        data = self.urlopen("http://localhost:%s/bizarre" % handler.port)
546        self.assertEqual(data, expected_response)
547        self.assertEqual(handler.requests, ["/bizarre"])
548
549    def test_200_with_parameters(self):
550        expected_response = b"pycon 2008..."
551        handler = self.start_server([(200, [], expected_response)])
552        data = self.urlopen("http://localhost:%s/bizarre" % handler.port,
553                             b"get=with_feeling")
554        self.assertEqual(data, expected_response)
555        self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"])
556
557    def test_https(self):
558        handler = self.start_https_server()
559        context = ssl.create_default_context(cafile=CERT_localhost)
560        data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context)
561        self.assertEqual(data, b"we care a bit")
562
563    def test_https_with_cafile(self):
564        handler = self.start_https_server(certfile=CERT_localhost)
565        with support.check_warnings(('', DeprecationWarning)):
566            # Good cert
567            data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
568                                cafile=CERT_localhost)
569            self.assertEqual(data, b"we care a bit")
570            # Bad cert
571            with self.assertRaises(urllib.error.URLError) as cm:
572                self.urlopen("https://localhost:%s/bizarre" % handler.port,
573                             cafile=CERT_fakehostname)
574            # Good cert, but mismatching hostname
575            handler = self.start_https_server(certfile=CERT_fakehostname)
576            with self.assertRaises(urllib.error.URLError) as cm:
577                self.urlopen("https://localhost:%s/bizarre" % handler.port,
578                             cafile=CERT_fakehostname)
579
580    def test_https_with_cadefault(self):
581        handler = self.start_https_server(certfile=CERT_localhost)
582        # Self-signed cert should fail verification with system certificate store
583        with support.check_warnings(('', DeprecationWarning)):
584            with self.assertRaises(urllib.error.URLError) as cm:
585                self.urlopen("https://localhost:%s/bizarre" % handler.port,
586                             cadefault=True)
587
588    def test_https_sni(self):
589        if ssl is None:
590            self.skipTest("ssl module required")
591        if not ssl.HAS_SNI:
592            self.skipTest("SNI support required in OpenSSL")
593        sni_name = None
594        def cb_sni(ssl_sock, server_name, initial_context):
595            nonlocal sni_name
596            sni_name = server_name
597        context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
598        context.set_servername_callback(cb_sni)
599        handler = self.start_https_server(context=context, certfile=CERT_localhost)
600        context = ssl.create_default_context(cafile=CERT_localhost)
601        self.urlopen("https://localhost:%s" % handler.port, context=context)
602        self.assertEqual(sni_name, "localhost")
603
604    def test_sending_headers(self):
605        handler = self.start_server()
606        req = urllib.request.Request("http://localhost:%s/" % handler.port,
607                                     headers={"Range": "bytes=20-39"})
608        with urllib.request.urlopen(req):
609            pass
610        self.assertEqual(handler.headers_received["Range"], "bytes=20-39")
611
612    def test_basic(self):
613        handler = self.start_server()
614        with urllib.request.urlopen("http://localhost:%s" % handler.port) as open_url:
615            for attr in ("read", "close", "info", "geturl"):
616                self.assertTrue(hasattr(open_url, attr), "object returned from "
617                             "urlopen lacks the %s attribute" % attr)
618            self.assertTrue(open_url.read(), "calling 'read' failed")
619
620    def test_info(self):
621        handler = self.start_server()
622        open_url = urllib.request.urlopen(
623            "http://localhost:%s" % handler.port)
624        with open_url:
625            info_obj = open_url.info()
626        self.assertIsInstance(info_obj, email.message.Message,
627                              "object returned by 'info' is not an "
628                              "instance of email.message.Message")
629        self.assertEqual(info_obj.get_content_subtype(), "plain")
630
631    def test_geturl(self):
632        # Make sure same URL as opened is returned by geturl.
633        handler = self.start_server()
634        open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
635        with open_url:
636            url = open_url.geturl()
637        self.assertEqual(url, "http://localhost:%s" % handler.port)
638
639    def test_iteration(self):
640        expected_response = b"pycon 2008..."
641        handler = self.start_server([(200, [], expected_response)])
642        data = urllib.request.urlopen("http://localhost:%s" % handler.port)
643        for line in data:
644            self.assertEqual(line, expected_response)
645
646    def test_line_iteration(self):
647        lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"]
648        expected_response = b"".join(lines)
649        handler = self.start_server([(200, [], expected_response)])
650        data = urllib.request.urlopen("http://localhost:%s" % handler.port)
651        for index, line in enumerate(data):
652            self.assertEqual(line, lines[index],
653                             "Fetched line number %s doesn't match expected:\n"
654                             "    Expected length was %s, got %s" %
655                             (index, len(lines[index]), len(line)))
656        self.assertEqual(index + 1, len(lines))
657
658
659threads_key = None
660
661def setUpModule():
662    # Store the threading_setup in a key and ensure that it is cleaned up
663    # in the tearDown
664    global threads_key
665    threads_key = support.threading_setup()
666
667def tearDownModule():
668    if threads_key:
669        support.threading_cleanup(*threads_key)
670
671if __name__ == "__main__":
672    unittest.main()
673