• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import base64
2import os
3import email
4import urllib.parse
5import urllib.request
6import http.server
7import threading
8import unittest
9import hashlib
10
11from test import support
12from test.support import hashlib_helper
13
14try:
15    import ssl
16except ImportError:
17    ssl = None
18
19here = os.path.dirname(__file__)
20# Self-signed cert file for 'localhost'
21CERT_localhost = os.path.join(here, 'keycert.pem')
22# Self-signed cert file for 'fakehostname'
23CERT_fakehostname = os.path.join(here, 'keycert2.pem')
24
25
26# Loopback http server infrastructure
27
28class LoopbackHttpServer(http.server.HTTPServer):
29    """HTTP server w/ a few modifications that make it useful for
30    loopback testing purposes.
31    """
32
33    def __init__(self, server_address, RequestHandlerClass):
34        http.server.HTTPServer.__init__(self,
35                                        server_address,
36                                        RequestHandlerClass)
37
38        # Set the timeout of our listening socket really low so
39        # that we can stop the server easily.
40        self.socket.settimeout(0.1)
41
42    def get_request(self):
43        """HTTPServer method, overridden."""
44
45        request, client_address = self.socket.accept()
46
47        # It's a loopback connection, so setting the timeout
48        # really low shouldn't affect anything, but should make
49        # deadlocks less likely to occur.
50        request.settimeout(10.0)
51
52        return (request, client_address)
53
54class LoopbackHttpServerThread(threading.Thread):
55    """Stoppable thread that runs a loopback http server."""
56
57    def __init__(self, request_handler):
58        threading.Thread.__init__(self)
59        self._stop_server = False
60        self.ready = threading.Event()
61        request_handler.protocol_version = "HTTP/1.0"
62        self.httpd = LoopbackHttpServer(("127.0.0.1", 0),
63                                        request_handler)
64        self.port = self.httpd.server_port
65
66    def stop(self):
67        """Stops the webserver if it's currently running."""
68
69        self._stop_server = True
70
71        self.join()
72        self.httpd.server_close()
73
74    def run(self):
75        self.ready.set()
76        while not self._stop_server:
77            self.httpd.handle_request()
78
79# Authentication infrastructure
80
81class DigestAuthHandler:
82    """Handler for performing digest authentication."""
83
84    def __init__(self):
85        self._request_num = 0
86        self._nonces = []
87        self._users = {}
88        self._realm_name = "Test Realm"
89        self._qop = "auth"
90
91    def set_qop(self, qop):
92        self._qop = qop
93
94    def set_users(self, users):
95        assert isinstance(users, dict)
96        self._users = users
97
98    def set_realm(self, realm):
99        self._realm_name = realm
100
101    def _generate_nonce(self):
102        self._request_num += 1
103        nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest()
104        self._nonces.append(nonce)
105        return nonce
106
107    def _create_auth_dict(self, auth_str):
108        first_space_index = auth_str.find(" ")
109        auth_str = auth_str[first_space_index+1:]
110
111        parts = auth_str.split(",")
112
113        auth_dict = {}
114        for part in parts:
115            name, value = part.split("=")
116            name = name.strip()
117            if value[0] == '"' and value[-1] == '"':
118                value = value[1:-1]
119            else:
120                value = value.strip()
121            auth_dict[name] = value
122        return auth_dict
123
124    def _validate_auth(self, auth_dict, password, method, uri):
125        final_dict = {}
126        final_dict.update(auth_dict)
127        final_dict["password"] = password
128        final_dict["method"] = method
129        final_dict["uri"] = uri
130        HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
131        HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest()
132        HA2_str = "%(method)s:%(uri)s" % final_dict
133        HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest()
134        final_dict["HA1"] = HA1
135        final_dict["HA2"] = HA2
136        response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
137                       "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
138        response = hashlib.md5(response_str.encode("ascii")).hexdigest()
139
140        return response == auth_dict["response"]
141
142    def _return_auth_challenge(self, request_handler):
143        request_handler.send_response(407, "Proxy Authentication Required")
144        request_handler.send_header("Content-Type", "text/html")
145        request_handler.send_header(
146            'Proxy-Authenticate', 'Digest realm="%s", '
147            'qop="%s",'
148            'nonce="%s", ' % \
149            (self._realm_name, self._qop, self._generate_nonce()))
150        # XXX: Not sure if we're supposed to add this next header or
151        # not.
152        #request_handler.send_header('Connection', 'close')
153        request_handler.end_headers()
154        request_handler.wfile.write(b"Proxy Authentication Required.")
155        return False
156
157    def handle_request(self, request_handler):
158        """Performs digest authentication on the given HTTP request
159        handler.  Returns True if authentication was successful, False
160        otherwise.
161
162        If no users have been set, then digest auth is effectively
163        disabled and this method will always return True.
164        """
165
166        if len(self._users) == 0:
167            return True
168
169        if "Proxy-Authorization" not in request_handler.headers:
170            return self._return_auth_challenge(request_handler)
171        else:
172            auth_dict = self._create_auth_dict(
173                request_handler.headers["Proxy-Authorization"]
174                )
175            if auth_dict["username"] in self._users:
176                password = self._users[ auth_dict["username"] ]
177            else:
178                return self._return_auth_challenge(request_handler)
179            if not auth_dict.get("nonce") in self._nonces:
180                return self._return_auth_challenge(request_handler)
181            else:
182                self._nonces.remove(auth_dict["nonce"])
183
184            auth_validated = False
185
186            # MSIE uses short_path in its validation, but Python's
187            # urllib.request uses the full path, so we're going to see if
188            # either of them works here.
189
190            for path in [request_handler.path, request_handler.short_path]:
191                if self._validate_auth(auth_dict,
192                                       password,
193                                       request_handler.command,
194                                       path):
195                    auth_validated = True
196
197            if not auth_validated:
198                return self._return_auth_challenge(request_handler)
199            return True
200
201
202class BasicAuthHandler(http.server.BaseHTTPRequestHandler):
203    """Handler for performing basic authentication."""
204    # Server side values
205    USER = 'testUser'
206    PASSWD = 'testPass'
207    REALM = 'Test'
208    USER_PASSWD = "%s:%s" % (USER, PASSWD)
209    ENCODED_AUTH = base64.b64encode(USER_PASSWD.encode('ascii')).decode('ascii')
210
211    def __init__(self, *args, **kwargs):
212        http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
213
214    def log_message(self, format, *args):
215        # Suppress console log message
216        pass
217
218    def do_HEAD(self):
219        self.send_response(200)
220        self.send_header("Content-type", "text/html")
221        self.end_headers()
222
223    def do_AUTHHEAD(self):
224        self.send_response(401)
225        self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM)
226        self.send_header("Content-type", "text/html")
227        self.end_headers()
228
229    def do_GET(self):
230        if not self.headers.get("Authorization", ""):
231            self.do_AUTHHEAD()
232            self.wfile.write(b"No Auth header received")
233        elif self.headers.get(
234                "Authorization", "") == "Basic " + self.ENCODED_AUTH:
235            self.send_response(200)
236            self.end_headers()
237            self.wfile.write(b"It works")
238        else:
239            # Request Unauthorized
240            self.do_AUTHHEAD()
241
242
243
244# Proxy test infrastructure
245
246class FakeProxyHandler(http.server.BaseHTTPRequestHandler):
247    """This is a 'fake proxy' that makes it look like the entire
248    internet has gone down due to a sudden zombie invasion.  It main
249    utility is in providing us with authentication support for
250    testing.
251    """
252
253    def __init__(self, digest_auth_handler, *args, **kwargs):
254        # This has to be set before calling our parent's __init__(), which will
255        # try to call do_GET().
256        self.digest_auth_handler = digest_auth_handler
257        http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
258
259    def log_message(self, format, *args):
260        # Uncomment the next line for debugging.
261        # sys.stderr.write(format % args)
262        pass
263
264    def do_GET(self):
265        (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse(
266            self.path, "http")
267        self.short_path = path
268        if self.digest_auth_handler.handle_request(self):
269            self.send_response(200, "OK")
270            self.send_header("Content-Type", "text/html")
271            self.end_headers()
272            self.wfile.write(bytes("You've reached %s!<BR>" % self.path,
273                                   "ascii"))
274            self.wfile.write(b"Our apologies, but our server is down due to "
275                             b"a sudden zombie invasion.")
276
277# Test cases
278
279class BasicAuthTests(unittest.TestCase):
280    USER = "testUser"
281    PASSWD = "testPass"
282    INCORRECT_PASSWD = "Incorrect"
283    REALM = "Test"
284
285    def setUp(self):
286        super(BasicAuthTests, self).setUp()
287        # With Basic Authentication
288        def http_server_with_basic_auth_handler(*args, **kwargs):
289            return BasicAuthHandler(*args, **kwargs)
290        self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler)
291        self.addCleanup(self.stop_server)
292        self.server_url = 'http://127.0.0.1:%s' % self.server.port
293        self.server.start()
294        self.server.ready.wait()
295
296    def stop_server(self):
297        self.server.stop()
298        self.server = None
299
300    def tearDown(self):
301        super(BasicAuthTests, self).tearDown()
302
303    def test_basic_auth_success(self):
304        ah = urllib.request.HTTPBasicAuthHandler()
305        ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
306        urllib.request.install_opener(urllib.request.build_opener(ah))
307        try:
308            self.assertTrue(urllib.request.urlopen(self.server_url))
309        except urllib.error.HTTPError:
310            self.fail("Basic auth failed for the url: %s" % self.server_url)
311
312    def test_basic_auth_httperror(self):
313        ah = urllib.request.HTTPBasicAuthHandler()
314        ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD)
315        urllib.request.install_opener(urllib.request.build_opener(ah))
316        self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url)
317
318
319@hashlib_helper.requires_hashdigest("md5")
320class ProxyAuthTests(unittest.TestCase):
321    URL = "http://localhost"
322
323    USER = "tester"
324    PASSWD = "test123"
325    REALM = "TestRealm"
326
327    def setUp(self):
328        super(ProxyAuthTests, self).setUp()
329        # Ignore proxy bypass settings in the environment.
330        def restore_environ(old_environ):
331            os.environ.clear()
332            os.environ.update(old_environ)
333        self.addCleanup(restore_environ, os.environ.copy())
334        os.environ['NO_PROXY'] = ''
335        os.environ['no_proxy'] = ''
336
337        self.digest_auth_handler = DigestAuthHandler()
338        self.digest_auth_handler.set_users({self.USER: self.PASSWD})
339        self.digest_auth_handler.set_realm(self.REALM)
340        # With Digest Authentication.
341        def create_fake_proxy_handler(*args, **kwargs):
342            return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
343
344        self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
345        self.addCleanup(self.stop_server)
346        self.server.start()
347        self.server.ready.wait()
348        proxy_url = "http://127.0.0.1:%d" % self.server.port
349        handler = urllib.request.ProxyHandler({"http" : proxy_url})
350        self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler()
351        self.opener = urllib.request.build_opener(
352            handler, self.proxy_digest_handler)
353
354    def stop_server(self):
355        self.server.stop()
356        self.server = None
357
358    def test_proxy_with_bad_password_raises_httperror(self):
359        self.proxy_digest_handler.add_password(self.REALM, self.URL,
360                                               self.USER, self.PASSWD+"bad")
361        self.digest_auth_handler.set_qop("auth")
362        self.assertRaises(urllib.error.HTTPError,
363                          self.opener.open,
364                          self.URL)
365
366    def test_proxy_with_no_password_raises_httperror(self):
367        self.digest_auth_handler.set_qop("auth")
368        self.assertRaises(urllib.error.HTTPError,
369                          self.opener.open,
370                          self.URL)
371
372    def test_proxy_qop_auth_works(self):
373        self.proxy_digest_handler.add_password(self.REALM, self.URL,
374                                               self.USER, self.PASSWD)
375        self.digest_auth_handler.set_qop("auth")
376        with self.opener.open(self.URL) as result:
377            while result.read():
378                pass
379
380    def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
381        self.proxy_digest_handler.add_password(self.REALM, self.URL,
382                                               self.USER, self.PASSWD)
383        self.digest_auth_handler.set_qop("auth-int")
384        try:
385            result = self.opener.open(self.URL)
386        except urllib.error.URLError:
387            # It's okay if we don't support auth-int, but we certainly
388            # shouldn't receive any kind of exception here other than
389            # a URLError.
390            pass
391        else:
392            with result:
393                while result.read():
394                    pass
395
396
397def GetRequestHandler(responses):
398
399    class FakeHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
400
401        server_version = "TestHTTP/"
402        requests = []
403        headers_received = []
404        port = 80
405
406        def do_GET(self):
407            body = self.send_head()
408            while body:
409                done = self.wfile.write(body)
410                body = body[done:]
411
412        def do_POST(self):
413            content_length = self.headers["Content-Length"]
414            post_data = self.rfile.read(int(content_length))
415            self.do_GET()
416            self.requests.append(post_data)
417
418        def send_head(self):
419            FakeHTTPRequestHandler.headers_received = self.headers
420            self.requests.append(self.path)
421            response_code, headers, body = responses.pop(0)
422
423            self.send_response(response_code)
424
425            for (header, value) in headers:
426                self.send_header(header, value % {'port':self.port})
427            if body:
428                self.send_header("Content-type", "text/plain")
429                self.end_headers()
430                return body
431            self.end_headers()
432
433        def log_message(self, *args):
434            pass
435
436
437    return FakeHTTPRequestHandler
438
439
440class TestUrlopen(unittest.TestCase):
441    """Tests urllib.request.urlopen using the network.
442
443    These tests are not exhaustive.  Assuming that testing using files does a
444    good job overall of some of the basic interface features.  There are no
445    tests exercising the optional 'data' and 'proxies' arguments.  No tests
446    for transparent redirection have been written.
447    """
448
449    def setUp(self):
450        super(TestUrlopen, self).setUp()
451
452        # clear _opener global variable
453        self.addCleanup(urllib.request.urlcleanup)
454
455        # Ignore proxies for localhost tests.
456        def restore_environ(old_environ):
457            os.environ.clear()
458            os.environ.update(old_environ)
459        self.addCleanup(restore_environ, os.environ.copy())
460        os.environ['NO_PROXY'] = '*'
461        os.environ['no_proxy'] = '*'
462
463    def urlopen(self, url, data=None, **kwargs):
464        l = []
465        f = urllib.request.urlopen(url, data, **kwargs)
466        try:
467            # Exercise various methods
468            l.extend(f.readlines(200))
469            l.append(f.readline())
470            l.append(f.read(1024))
471            l.append(f.read())
472        finally:
473            f.close()
474        return b"".join(l)
475
476    def stop_server(self):
477        self.server.stop()
478        self.server = None
479
480    def start_server(self, responses=None):
481        if responses is None:
482            responses = [(200, [], b"we don't care")]
483        handler = GetRequestHandler(responses)
484
485        self.server = LoopbackHttpServerThread(handler)
486        self.addCleanup(self.stop_server)
487        self.server.start()
488        self.server.ready.wait()
489        port = self.server.port
490        handler.port = port
491        return handler
492
493    def start_https_server(self, responses=None, **kwargs):
494        if not hasattr(urllib.request, 'HTTPSHandler'):
495            self.skipTest('ssl support required')
496        from test.ssl_servers import make_https_server
497        if responses is None:
498            responses = [(200, [], b"we care a bit")]
499        handler = GetRequestHandler(responses)
500        server = make_https_server(self, handler_class=handler, **kwargs)
501        handler.port = server.port
502        return handler
503
504    def test_redirection(self):
505        expected_response = b"We got here..."
506        responses = [
507            (302, [("Location", "http://localhost:%(port)s/somewhere_else")],
508             ""),
509            (200, [], expected_response)
510        ]
511
512        handler = self.start_server(responses)
513        data = self.urlopen("http://localhost:%s/" % handler.port)
514        self.assertEqual(data, expected_response)
515        self.assertEqual(handler.requests, ["/", "/somewhere_else"])
516
517    def test_chunked(self):
518        expected_response = b"hello world"
519        chunked_start = (
520                        b'a\r\n'
521                        b'hello worl\r\n'
522                        b'1\r\n'
523                        b'd\r\n'
524                        b'0\r\n'
525                        )
526        response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)]
527        handler = self.start_server(response)
528        data = self.urlopen("http://localhost:%s/" % handler.port)
529        self.assertEqual(data, expected_response)
530
531    def test_404(self):
532        expected_response = b"Bad bad bad..."
533        handler = self.start_server([(404, [], expected_response)])
534
535        try:
536            self.urlopen("http://localhost:%s/weeble" % handler.port)
537        except urllib.error.URLError as f:
538            data = f.read()
539            f.close()
540        else:
541            self.fail("404 should raise URLError")
542
543        self.assertEqual(data, expected_response)
544        self.assertEqual(handler.requests, ["/weeble"])
545
546    def test_200(self):
547        expected_response = b"pycon 2008..."
548        handler = self.start_server([(200, [], expected_response)])
549        data = self.urlopen("http://localhost:%s/bizarre" % handler.port)
550        self.assertEqual(data, expected_response)
551        self.assertEqual(handler.requests, ["/bizarre"])
552
553    def test_200_with_parameters(self):
554        expected_response = b"pycon 2008..."
555        handler = self.start_server([(200, [], expected_response)])
556        data = self.urlopen("http://localhost:%s/bizarre" % handler.port,
557                             b"get=with_feeling")
558        self.assertEqual(data, expected_response)
559        self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"])
560
561    def test_https(self):
562        handler = self.start_https_server()
563        context = ssl.create_default_context(cafile=CERT_localhost)
564        data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context)
565        self.assertEqual(data, b"we care a bit")
566
567    def test_https_with_cafile(self):
568        handler = self.start_https_server(certfile=CERT_localhost)
569        with support.check_warnings(('', DeprecationWarning)):
570            # Good cert
571            data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
572                                cafile=CERT_localhost)
573            self.assertEqual(data, b"we care a bit")
574            # Bad cert
575            with self.assertRaises(urllib.error.URLError) as cm:
576                self.urlopen("https://localhost:%s/bizarre" % handler.port,
577                             cafile=CERT_fakehostname)
578            # Good cert, but mismatching hostname
579            handler = self.start_https_server(certfile=CERT_fakehostname)
580            with self.assertRaises(urllib.error.URLError) as cm:
581                self.urlopen("https://localhost:%s/bizarre" % handler.port,
582                             cafile=CERT_fakehostname)
583
584    def test_https_with_cadefault(self):
585        handler = self.start_https_server(certfile=CERT_localhost)
586        # Self-signed cert should fail verification with system certificate store
587        with support.check_warnings(('', DeprecationWarning)):
588            with self.assertRaises(urllib.error.URLError) as cm:
589                self.urlopen("https://localhost:%s/bizarre" % handler.port,
590                             cadefault=True)
591
592    def test_https_sni(self):
593        if ssl is None:
594            self.skipTest("ssl module required")
595        if not ssl.HAS_SNI:
596            self.skipTest("SNI support required in OpenSSL")
597        sni_name = None
598        def cb_sni(ssl_sock, server_name, initial_context):
599            nonlocal sni_name
600            sni_name = server_name
601        context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
602        context.set_servername_callback(cb_sni)
603        handler = self.start_https_server(context=context, certfile=CERT_localhost)
604        context = ssl.create_default_context(cafile=CERT_localhost)
605        self.urlopen("https://localhost:%s" % handler.port, context=context)
606        self.assertEqual(sni_name, "localhost")
607
608    def test_sending_headers(self):
609        handler = self.start_server()
610        req = urllib.request.Request("http://localhost:%s/" % handler.port,
611                                     headers={"Range": "bytes=20-39"})
612        with urllib.request.urlopen(req):
613            pass
614        self.assertEqual(handler.headers_received["Range"], "bytes=20-39")
615
616    def test_basic(self):
617        handler = self.start_server()
618        with urllib.request.urlopen("http://localhost:%s" % handler.port) as open_url:
619            for attr in ("read", "close", "info", "geturl"):
620                self.assertTrue(hasattr(open_url, attr), "object returned from "
621                             "urlopen lacks the %s attribute" % attr)
622            self.assertTrue(open_url.read(), "calling 'read' failed")
623
624    def test_info(self):
625        handler = self.start_server()
626        open_url = urllib.request.urlopen(
627            "http://localhost:%s" % handler.port)
628        with open_url:
629            info_obj = open_url.info()
630        self.assertIsInstance(info_obj, email.message.Message,
631                              "object returned by 'info' is not an "
632                              "instance of email.message.Message")
633        self.assertEqual(info_obj.get_content_subtype(), "plain")
634
635    def test_geturl(self):
636        # Make sure same URL as opened is returned by geturl.
637        handler = self.start_server()
638        open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
639        with open_url:
640            url = open_url.geturl()
641        self.assertEqual(url, "http://localhost:%s" % handler.port)
642
643    def test_iteration(self):
644        expected_response = b"pycon 2008..."
645        handler = self.start_server([(200, [], expected_response)])
646        data = urllib.request.urlopen("http://localhost:%s" % handler.port)
647        for line in data:
648            self.assertEqual(line, expected_response)
649
650    def test_line_iteration(self):
651        lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"]
652        expected_response = b"".join(lines)
653        handler = self.start_server([(200, [], expected_response)])
654        data = urllib.request.urlopen("http://localhost:%s" % handler.port)
655        for index, line in enumerate(data):
656            self.assertEqual(line, lines[index],
657                             "Fetched line number %s doesn't match expected:\n"
658                             "    Expected length was %s, got %s" %
659                             (index, len(lines[index]), len(line)))
660        self.assertEqual(index + 1, len(lines))
661
662
663threads_key = None
664
665def setUpModule():
666    # Store the threading_setup in a key and ensure that it is cleaned up
667    # in the tearDown
668    global threads_key
669    threads_key = support.threading_setup()
670
671def tearDownModule():
672    if threads_key:
673        support.threading_cleanup(*threads_key)
674
675if __name__ == "__main__":
676    unittest.main()
677