• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import base64
2import os
3import email
4import urllib.parse
5import urllib.request
6import http.server
7import threading
8import unittest
9import hashlib
10
11from test.support import hashlib_helper
12from test.support import threading_helper
13from test.support import warnings_helper
14
15try:
16    import ssl
17except ImportError:
18    ssl = None
19
20here = os.path.dirname(__file__)
21# Self-signed cert file for 'localhost'
22CERT_localhost = os.path.join(here, 'keycert.pem')
23# Self-signed cert file for 'fakehostname'
24CERT_fakehostname = os.path.join(here, 'keycert2.pem')
25
26
27# Loopback http server infrastructure
28
29class LoopbackHttpServer(http.server.HTTPServer):
30    """HTTP server w/ a few modifications that make it useful for
31    loopback testing purposes.
32    """
33
34    def __init__(self, server_address, RequestHandlerClass):
35        http.server.HTTPServer.__init__(self,
36                                        server_address,
37                                        RequestHandlerClass)
38
39        # Set the timeout of our listening socket really low so
40        # that we can stop the server easily.
41        self.socket.settimeout(0.1)
42
43    def get_request(self):
44        """HTTPServer method, overridden."""
45
46        request, client_address = self.socket.accept()
47
48        # It's a loopback connection, so setting the timeout
49        # really low shouldn't affect anything, but should make
50        # deadlocks less likely to occur.
51        request.settimeout(10.0)
52
53        return (request, client_address)
54
55class LoopbackHttpServerThread(threading.Thread):
56    """Stoppable thread that runs a loopback http server."""
57
58    def __init__(self, request_handler):
59        threading.Thread.__init__(self)
60        self._stop_server = False
61        self.ready = threading.Event()
62        request_handler.protocol_version = "HTTP/1.0"
63        self.httpd = LoopbackHttpServer(("127.0.0.1", 0),
64                                        request_handler)
65        self.port = self.httpd.server_port
66
67    def stop(self):
68        """Stops the webserver if it's currently running."""
69
70        self._stop_server = True
71
72        self.join()
73        self.httpd.server_close()
74
75    def run(self):
76        self.ready.set()
77        while not self._stop_server:
78            self.httpd.handle_request()
79
80# Authentication infrastructure
81
82class DigestAuthHandler:
83    """Handler for performing digest authentication."""
84
85    def __init__(self):
86        self._request_num = 0
87        self._nonces = []
88        self._users = {}
89        self._realm_name = "Test Realm"
90        self._qop = "auth"
91
92    def set_qop(self, qop):
93        self._qop = qop
94
95    def set_users(self, users):
96        assert isinstance(users, dict)
97        self._users = users
98
99    def set_realm(self, realm):
100        self._realm_name = realm
101
102    def _generate_nonce(self):
103        self._request_num += 1
104        nonce = hashlib.md5(str(self._request_num).encode("ascii")).hexdigest()
105        self._nonces.append(nonce)
106        return nonce
107
108    def _create_auth_dict(self, auth_str):
109        first_space_index = auth_str.find(" ")
110        auth_str = auth_str[first_space_index+1:]
111
112        parts = auth_str.split(",")
113
114        auth_dict = {}
115        for part in parts:
116            name, value = part.split("=")
117            name = name.strip()
118            if value[0] == '"' and value[-1] == '"':
119                value = value[1:-1]
120            else:
121                value = value.strip()
122            auth_dict[name] = value
123        return auth_dict
124
125    def _validate_auth(self, auth_dict, password, method, uri):
126        final_dict = {}
127        final_dict.update(auth_dict)
128        final_dict["password"] = password
129        final_dict["method"] = method
130        final_dict["uri"] = uri
131        HA1_str = "%(username)s:%(realm)s:%(password)s" % final_dict
132        HA1 = hashlib.md5(HA1_str.encode("ascii")).hexdigest()
133        HA2_str = "%(method)s:%(uri)s" % final_dict
134        HA2 = hashlib.md5(HA2_str.encode("ascii")).hexdigest()
135        final_dict["HA1"] = HA1
136        final_dict["HA2"] = HA2
137        response_str = "%(HA1)s:%(nonce)s:%(nc)s:" \
138                       "%(cnonce)s:%(qop)s:%(HA2)s" % final_dict
139        response = hashlib.md5(response_str.encode("ascii")).hexdigest()
140
141        return response == auth_dict["response"]
142
143    def _return_auth_challenge(self, request_handler):
144        request_handler.send_response(407, "Proxy Authentication Required")
145        request_handler.send_header("Content-Type", "text/html")
146        request_handler.send_header(
147            'Proxy-Authenticate', 'Digest realm="%s", '
148            'qop="%s",'
149            'nonce="%s", ' % \
150            (self._realm_name, self._qop, self._generate_nonce()))
151        # XXX: Not sure if we're supposed to add this next header or
152        # not.
153        #request_handler.send_header('Connection', 'close')
154        request_handler.end_headers()
155        request_handler.wfile.write(b"Proxy Authentication Required.")
156        return False
157
158    def handle_request(self, request_handler):
159        """Performs digest authentication on the given HTTP request
160        handler.  Returns True if authentication was successful, False
161        otherwise.
162
163        If no users have been set, then digest auth is effectively
164        disabled and this method will always return True.
165        """
166
167        if len(self._users) == 0:
168            return True
169
170        if "Proxy-Authorization" not in request_handler.headers:
171            return self._return_auth_challenge(request_handler)
172        else:
173            auth_dict = self._create_auth_dict(
174                request_handler.headers["Proxy-Authorization"]
175                )
176            if auth_dict["username"] in self._users:
177                password = self._users[ auth_dict["username"] ]
178            else:
179                return self._return_auth_challenge(request_handler)
180            if not auth_dict.get("nonce") in self._nonces:
181                return self._return_auth_challenge(request_handler)
182            else:
183                self._nonces.remove(auth_dict["nonce"])
184
185            auth_validated = False
186
187            # MSIE uses short_path in its validation, but Python's
188            # urllib.request uses the full path, so we're going to see if
189            # either of them works here.
190
191            for path in [request_handler.path, request_handler.short_path]:
192                if self._validate_auth(auth_dict,
193                                       password,
194                                       request_handler.command,
195                                       path):
196                    auth_validated = True
197
198            if not auth_validated:
199                return self._return_auth_challenge(request_handler)
200            return True
201
202
203class BasicAuthHandler(http.server.BaseHTTPRequestHandler):
204    """Handler for performing basic authentication."""
205    # Server side values
206    USER = 'testUser'
207    PASSWD = 'testPass'
208    REALM = 'Test'
209    USER_PASSWD = "%s:%s" % (USER, PASSWD)
210    ENCODED_AUTH = base64.b64encode(USER_PASSWD.encode('ascii')).decode('ascii')
211
212    def __init__(self, *args, **kwargs):
213        http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
214
215    def log_message(self, format, *args):
216        # Suppress console log message
217        pass
218
219    def do_HEAD(self):
220        self.send_response(200)
221        self.send_header("Content-type", "text/html")
222        self.end_headers()
223
224    def do_AUTHHEAD(self):
225        self.send_response(401)
226        self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM)
227        self.send_header("Content-type", "text/html")
228        self.end_headers()
229
230    def do_GET(self):
231        if not self.headers.get("Authorization", ""):
232            self.do_AUTHHEAD()
233            self.wfile.write(b"No Auth header received")
234        elif self.headers.get(
235                "Authorization", "") == "Basic " + self.ENCODED_AUTH:
236            self.send_response(200)
237            self.end_headers()
238            self.wfile.write(b"It works")
239        else:
240            # Request Unauthorized
241            self.do_AUTHHEAD()
242
243
244
245# Proxy test infrastructure
246
247class FakeProxyHandler(http.server.BaseHTTPRequestHandler):
248    """This is a 'fake proxy' that makes it look like the entire
249    internet has gone down due to a sudden zombie invasion.  It main
250    utility is in providing us with authentication support for
251    testing.
252    """
253
254    def __init__(self, digest_auth_handler, *args, **kwargs):
255        # This has to be set before calling our parent's __init__(), which will
256        # try to call do_GET().
257        self.digest_auth_handler = digest_auth_handler
258        http.server.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
259
260    def log_message(self, format, *args):
261        # Uncomment the next line for debugging.
262        # sys.stderr.write(format % args)
263        pass
264
265    def do_GET(self):
266        (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse(
267            self.path, "http")
268        self.short_path = path
269        if self.digest_auth_handler.handle_request(self):
270            self.send_response(200, "OK")
271            self.send_header("Content-Type", "text/html")
272            self.end_headers()
273            self.wfile.write(bytes("You've reached %s!<BR>" % self.path,
274                                   "ascii"))
275            self.wfile.write(b"Our apologies, but our server is down due to "
276                             b"a sudden zombie invasion.")
277
278# Test cases
279
280class BasicAuthTests(unittest.TestCase):
281    USER = "testUser"
282    PASSWD = "testPass"
283    INCORRECT_PASSWD = "Incorrect"
284    REALM = "Test"
285
286    def setUp(self):
287        super(BasicAuthTests, self).setUp()
288        # With Basic Authentication
289        def http_server_with_basic_auth_handler(*args, **kwargs):
290            return BasicAuthHandler(*args, **kwargs)
291        self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler)
292        self.addCleanup(self.stop_server)
293        self.server_url = 'http://127.0.0.1:%s' % self.server.port
294        self.server.start()
295        self.server.ready.wait()
296
297    def stop_server(self):
298        self.server.stop()
299        self.server = None
300
301    def tearDown(self):
302        super(BasicAuthTests, self).tearDown()
303
304    def test_basic_auth_success(self):
305        ah = urllib.request.HTTPBasicAuthHandler()
306        ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
307        urllib.request.install_opener(urllib.request.build_opener(ah))
308        try:
309            self.assertTrue(urllib.request.urlopen(self.server_url))
310        except urllib.error.HTTPError:
311            self.fail("Basic auth failed for the url: %s" % self.server_url)
312
313    def test_basic_auth_httperror(self):
314        ah = urllib.request.HTTPBasicAuthHandler()
315        ah.add_password(self.REALM, self.server_url, self.USER, self.INCORRECT_PASSWD)
316        urllib.request.install_opener(urllib.request.build_opener(ah))
317        self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url)
318
319
320@hashlib_helper.requires_hashdigest("md5", openssl=True)
321class ProxyAuthTests(unittest.TestCase):
322    URL = "http://localhost"
323
324    USER = "tester"
325    PASSWD = "test123"
326    REALM = "TestRealm"
327
328    def setUp(self):
329        super(ProxyAuthTests, self).setUp()
330        # Ignore proxy bypass settings in the environment.
331        def restore_environ(old_environ):
332            os.environ.clear()
333            os.environ.update(old_environ)
334        self.addCleanup(restore_environ, os.environ.copy())
335        os.environ['NO_PROXY'] = ''
336        os.environ['no_proxy'] = ''
337
338        self.digest_auth_handler = DigestAuthHandler()
339        self.digest_auth_handler.set_users({self.USER: self.PASSWD})
340        self.digest_auth_handler.set_realm(self.REALM)
341        # With Digest Authentication.
342        def create_fake_proxy_handler(*args, **kwargs):
343            return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
344
345        self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
346        self.addCleanup(self.stop_server)
347        self.server.start()
348        self.server.ready.wait()
349        proxy_url = "http://127.0.0.1:%d" % self.server.port
350        handler = urllib.request.ProxyHandler({"http" : proxy_url})
351        self.proxy_digest_handler = urllib.request.ProxyDigestAuthHandler()
352        self.opener = urllib.request.build_opener(
353            handler, self.proxy_digest_handler)
354
355    def stop_server(self):
356        self.server.stop()
357        self.server = None
358
359    def test_proxy_with_bad_password_raises_httperror(self):
360        self.proxy_digest_handler.add_password(self.REALM, self.URL,
361                                               self.USER, self.PASSWD+"bad")
362        self.digest_auth_handler.set_qop("auth")
363        self.assertRaises(urllib.error.HTTPError,
364                          self.opener.open,
365                          self.URL)
366
367    def test_proxy_with_no_password_raises_httperror(self):
368        self.digest_auth_handler.set_qop("auth")
369        self.assertRaises(urllib.error.HTTPError,
370                          self.opener.open,
371                          self.URL)
372
373    def test_proxy_qop_auth_works(self):
374        self.proxy_digest_handler.add_password(self.REALM, self.URL,
375                                               self.USER, self.PASSWD)
376        self.digest_auth_handler.set_qop("auth")
377        with self.opener.open(self.URL) as result:
378            while result.read():
379                pass
380
381    def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
382        self.proxy_digest_handler.add_password(self.REALM, self.URL,
383                                               self.USER, self.PASSWD)
384        self.digest_auth_handler.set_qop("auth-int")
385        try:
386            result = self.opener.open(self.URL)
387        except urllib.error.URLError:
388            # It's okay if we don't support auth-int, but we certainly
389            # shouldn't receive any kind of exception here other than
390            # a URLError.
391            pass
392        else:
393            with result:
394                while result.read():
395                    pass
396
397
398def GetRequestHandler(responses):
399
400    class FakeHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
401
402        server_version = "TestHTTP/"
403        requests = []
404        headers_received = []
405        port = 80
406
407        def do_GET(self):
408            body = self.send_head()
409            while body:
410                done = self.wfile.write(body)
411                body = body[done:]
412
413        def do_POST(self):
414            content_length = self.headers["Content-Length"]
415            post_data = self.rfile.read(int(content_length))
416            self.do_GET()
417            self.requests.append(post_data)
418
419        def send_head(self):
420            FakeHTTPRequestHandler.headers_received = self.headers
421            self.requests.append(self.path)
422            response_code, headers, body = responses.pop(0)
423
424            self.send_response(response_code)
425
426            for (header, value) in headers:
427                self.send_header(header, value % {'port':self.port})
428            if body:
429                self.send_header("Content-type", "text/plain")
430                self.end_headers()
431                return body
432            self.end_headers()
433
434        def log_message(self, *args):
435            pass
436
437
438    return FakeHTTPRequestHandler
439
440
441class TestUrlopen(unittest.TestCase):
442    """Tests urllib.request.urlopen using the network.
443
444    These tests are not exhaustive.  Assuming that testing using files does a
445    good job overall of some of the basic interface features.  There are no
446    tests exercising the optional 'data' and 'proxies' arguments.  No tests
447    for transparent redirection have been written.
448    """
449
450    def setUp(self):
451        super(TestUrlopen, self).setUp()
452
453        # clear _opener global variable
454        self.addCleanup(urllib.request.urlcleanup)
455
456        # Ignore proxies for localhost tests.
457        def restore_environ(old_environ):
458            os.environ.clear()
459            os.environ.update(old_environ)
460        self.addCleanup(restore_environ, os.environ.copy())
461        os.environ['NO_PROXY'] = '*'
462        os.environ['no_proxy'] = '*'
463
464    def urlopen(self, url, data=None, **kwargs):
465        l = []
466        f = urllib.request.urlopen(url, data, **kwargs)
467        try:
468            # Exercise various methods
469            l.extend(f.readlines(200))
470            l.append(f.readline())
471            l.append(f.read(1024))
472            l.append(f.read())
473        finally:
474            f.close()
475        return b"".join(l)
476
477    def stop_server(self):
478        self.server.stop()
479        self.server = None
480
481    def start_server(self, responses=None):
482        if responses is None:
483            responses = [(200, [], b"we don't care")]
484        handler = GetRequestHandler(responses)
485
486        self.server = LoopbackHttpServerThread(handler)
487        self.addCleanup(self.stop_server)
488        self.server.start()
489        self.server.ready.wait()
490        port = self.server.port
491        handler.port = port
492        return handler
493
494    def start_https_server(self, responses=None, **kwargs):
495        if not hasattr(urllib.request, 'HTTPSHandler'):
496            self.skipTest('ssl support required')
497        from test.ssl_servers import make_https_server
498        if responses is None:
499            responses = [(200, [], b"we care a bit")]
500        handler = GetRequestHandler(responses)
501        server = make_https_server(self, handler_class=handler, **kwargs)
502        handler.port = server.port
503        return handler
504
505    def test_redirection(self):
506        expected_response = b"We got here..."
507        responses = [
508            (302, [("Location", "http://localhost:%(port)s/somewhere_else")],
509             ""),
510            (200, [], expected_response)
511        ]
512
513        handler = self.start_server(responses)
514        data = self.urlopen("http://localhost:%s/" % handler.port)
515        self.assertEqual(data, expected_response)
516        self.assertEqual(handler.requests, ["/", "/somewhere_else"])
517
518    def test_chunked(self):
519        expected_response = b"hello world"
520        chunked_start = (
521                        b'a\r\n'
522                        b'hello worl\r\n'
523                        b'1\r\n'
524                        b'd\r\n'
525                        b'0\r\n'
526                        )
527        response = [(200, [("Transfer-Encoding", "chunked")], chunked_start)]
528        handler = self.start_server(response)
529        data = self.urlopen("http://localhost:%s/" % handler.port)
530        self.assertEqual(data, expected_response)
531
532    def test_404(self):
533        expected_response = b"Bad bad bad..."
534        handler = self.start_server([(404, [], expected_response)])
535
536        try:
537            self.urlopen("http://localhost:%s/weeble" % handler.port)
538        except urllib.error.URLError as f:
539            data = f.read()
540            f.close()
541        else:
542            self.fail("404 should raise URLError")
543
544        self.assertEqual(data, expected_response)
545        self.assertEqual(handler.requests, ["/weeble"])
546
547    def test_200(self):
548        expected_response = b"pycon 2008..."
549        handler = self.start_server([(200, [], expected_response)])
550        data = self.urlopen("http://localhost:%s/bizarre" % handler.port)
551        self.assertEqual(data, expected_response)
552        self.assertEqual(handler.requests, ["/bizarre"])
553
554    def test_200_with_parameters(self):
555        expected_response = b"pycon 2008..."
556        handler = self.start_server([(200, [], expected_response)])
557        data = self.urlopen("http://localhost:%s/bizarre" % handler.port,
558                             b"get=with_feeling")
559        self.assertEqual(data, expected_response)
560        self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"])
561
562    def test_https(self):
563        handler = self.start_https_server()
564        context = ssl.create_default_context(cafile=CERT_localhost)
565        data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context)
566        self.assertEqual(data, b"we care a bit")
567
568    def test_https_with_cafile(self):
569        handler = self.start_https_server(certfile=CERT_localhost)
570        with warnings_helper.check_warnings(('', DeprecationWarning)):
571            # Good cert
572            data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
573                                cafile=CERT_localhost)
574            self.assertEqual(data, b"we care a bit")
575            # Bad cert
576            with self.assertRaises(urllib.error.URLError) as cm:
577                self.urlopen("https://localhost:%s/bizarre" % handler.port,
578                             cafile=CERT_fakehostname)
579            # Good cert, but mismatching hostname
580            handler = self.start_https_server(certfile=CERT_fakehostname)
581            with self.assertRaises(urllib.error.URLError) as cm:
582                self.urlopen("https://localhost:%s/bizarre" % handler.port,
583                             cafile=CERT_fakehostname)
584
585    def test_https_with_cadefault(self):
586        handler = self.start_https_server(certfile=CERT_localhost)
587        # Self-signed cert should fail verification with system certificate store
588        with warnings_helper.check_warnings(('', DeprecationWarning)):
589            with self.assertRaises(urllib.error.URLError) as cm:
590                self.urlopen("https://localhost:%s/bizarre" % handler.port,
591                             cadefault=True)
592
593    def test_https_sni(self):
594        if ssl is None:
595            self.skipTest("ssl module required")
596        if not ssl.HAS_SNI:
597            self.skipTest("SNI support required in OpenSSL")
598        sni_name = None
599        def cb_sni(ssl_sock, server_name, initial_context):
600            nonlocal sni_name
601            sni_name = server_name
602        context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
603        context.set_servername_callback(cb_sni)
604        handler = self.start_https_server(context=context, certfile=CERT_localhost)
605        context = ssl.create_default_context(cafile=CERT_localhost)
606        self.urlopen("https://localhost:%s" % handler.port, context=context)
607        self.assertEqual(sni_name, "localhost")
608
609    def test_sending_headers(self):
610        handler = self.start_server()
611        req = urllib.request.Request("http://localhost:%s/" % handler.port,
612                                     headers={"Range": "bytes=20-39"})
613        with urllib.request.urlopen(req):
614            pass
615        self.assertEqual(handler.headers_received["Range"], "bytes=20-39")
616
617    def test_basic(self):
618        handler = self.start_server()
619        with urllib.request.urlopen("http://localhost:%s" % handler.port) as open_url:
620            for attr in ("read", "close", "info", "geturl"):
621                self.assertTrue(hasattr(open_url, attr), "object returned from "
622                             "urlopen lacks the %s attribute" % attr)
623            self.assertTrue(open_url.read(), "calling 'read' failed")
624
625    def test_info(self):
626        handler = self.start_server()
627        open_url = urllib.request.urlopen(
628            "http://localhost:%s" % handler.port)
629        with open_url:
630            info_obj = open_url.info()
631        self.assertIsInstance(info_obj, email.message.Message,
632                              "object returned by 'info' is not an "
633                              "instance of email.message.Message")
634        self.assertEqual(info_obj.get_content_subtype(), "plain")
635
636    def test_geturl(self):
637        # Make sure same URL as opened is returned by geturl.
638        handler = self.start_server()
639        open_url = urllib.request.urlopen("http://localhost:%s" % handler.port)
640        with open_url:
641            url = open_url.geturl()
642        self.assertEqual(url, "http://localhost:%s" % handler.port)
643
644    def test_iteration(self):
645        expected_response = b"pycon 2008..."
646        handler = self.start_server([(200, [], expected_response)])
647        data = urllib.request.urlopen("http://localhost:%s" % handler.port)
648        for line in data:
649            self.assertEqual(line, expected_response)
650
651    def test_line_iteration(self):
652        lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"]
653        expected_response = b"".join(lines)
654        handler = self.start_server([(200, [], expected_response)])
655        data = urllib.request.urlopen("http://localhost:%s" % handler.port)
656        for index, line in enumerate(data):
657            self.assertEqual(line, lines[index],
658                             "Fetched line number %s doesn't match expected:\n"
659                             "    Expected length was %s, got %s" %
660                             (index, len(lines[index]), len(line)))
661        self.assertEqual(index + 1, len(lines))
662
663
664def setUpModule():
665    thread_info = threading_helper.threading_setup()
666    unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
667
668
669if __name__ == "__main__":
670    unittest.main()
671