• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Test the support for SSL and sockets
2
3import sys
4import unittest
5from test import test_support
6import asyncore
7import socket
8import select
9import time
10import gc
11import os
12import errno
13import pprint
14import urllib, urlparse
15import traceback
16import weakref
17import functools
18import platform
19
20from BaseHTTPServer import HTTPServer
21from SimpleHTTPServer import SimpleHTTPRequestHandler
22
23ssl = test_support.import_module("ssl")
24
25HOST = test_support.HOST
26CERTFILE = None
27SVN_PYTHON_ORG_ROOT_CERT = None
28
29def handle_error(prefix):
30    exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
31    if test_support.verbose:
32        sys.stdout.write(prefix + exc_format)
33
34
35class BasicTests(unittest.TestCase):
36
37    def test_sslwrap_simple(self):
38        # A crude test for the legacy API
39        try:
40            ssl.sslwrap_simple(socket.socket(socket.AF_INET))
41        except IOError, e:
42            if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
43                pass
44            else:
45                raise
46        try:
47            ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
48        except IOError, e:
49            if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
50                pass
51            else:
52                raise
53
54# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
55def skip_if_broken_ubuntu_ssl(func):
56    if hasattr(ssl, 'PROTOCOL_SSLv2'):
57        # We need to access the lower-level wrapper in order to create an
58        # implicit SSL context without trying to connect or listen.
59        try:
60            import _ssl
61        except ImportError:
62            # The returned function won't get executed, just ignore the error
63            pass
64        @functools.wraps(func)
65        def f(*args, **kwargs):
66            try:
67                s = socket.socket(socket.AF_INET)
68                _ssl.sslwrap(s._sock, 0, None, None,
69                             ssl.CERT_NONE, ssl.PROTOCOL_SSLv2, None, None)
70            except ssl.SSLError as e:
71                if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
72                    platform.linux_distribution() == ('debian', 'squeeze/sid', '')
73                    and 'Invalid SSL protocol variant specified' in str(e)):
74                    raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
75            return func(*args, **kwargs)
76        return f
77    else:
78        return func
79
80
81class BasicSocketTests(unittest.TestCase):
82
83    def test_constants(self):
84        #ssl.PROTOCOL_SSLv2
85        ssl.PROTOCOL_SSLv23
86        ssl.PROTOCOL_SSLv3
87        ssl.PROTOCOL_TLSv1
88        ssl.CERT_NONE
89        ssl.CERT_OPTIONAL
90        ssl.CERT_REQUIRED
91
92    def test_random(self):
93        v = ssl.RAND_status()
94        if test_support.verbose:
95            sys.stdout.write("\n RAND_status is %d (%s)\n"
96                             % (v, (v and "sufficient randomness") or
97                                "insufficient randomness"))
98        try:
99            ssl.RAND_egd(1)
100        except TypeError:
101            pass
102        else:
103            print "didn't raise TypeError"
104        ssl.RAND_add("this is a random string", 75.0)
105
106    def test_parse_cert(self):
107        # note that this uses an 'unofficial' function in _ssl.c,
108        # provided solely for this test, to exercise the certificate
109        # parsing code
110        p = ssl._ssl._test_decode_cert(CERTFILE, False)
111        if test_support.verbose:
112            sys.stdout.write("\n" + pprint.pformat(p) + "\n")
113
114    def test_DER_to_PEM(self):
115        with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
116            pem = f.read()
117        d1 = ssl.PEM_cert_to_DER_cert(pem)
118        p2 = ssl.DER_cert_to_PEM_cert(d1)
119        d2 = ssl.PEM_cert_to_DER_cert(p2)
120        self.assertEqual(d1, d2)
121        if not p2.startswith(ssl.PEM_HEADER + '\n'):
122            self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
123        if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
124            self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
125
126    def test_openssl_version(self):
127        n = ssl.OPENSSL_VERSION_NUMBER
128        t = ssl.OPENSSL_VERSION_INFO
129        s = ssl.OPENSSL_VERSION
130        self.assertIsInstance(n, (int, long))
131        self.assertIsInstance(t, tuple)
132        self.assertIsInstance(s, str)
133        # Some sanity checks follow
134        # >= 0.9
135        self.assertGreaterEqual(n, 0x900000)
136        # < 2.0
137        self.assertLess(n, 0x20000000)
138        major, minor, fix, patch, status = t
139        self.assertGreaterEqual(major, 0)
140        self.assertLess(major, 2)
141        self.assertGreaterEqual(minor, 0)
142        self.assertLess(minor, 256)
143        self.assertGreaterEqual(fix, 0)
144        self.assertLess(fix, 256)
145        self.assertGreaterEqual(patch, 0)
146        self.assertLessEqual(patch, 26)
147        self.assertGreaterEqual(status, 0)
148        self.assertLessEqual(status, 15)
149        # Version string as returned by OpenSSL, the format might change
150        self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
151                        (s, t))
152
153    def test_ciphers(self):
154        if not test_support.is_resource_enabled('network'):
155            return
156        remote = ("svn.python.org", 443)
157        with test_support.transient_internet(remote[0]):
158            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
159                                cert_reqs=ssl.CERT_NONE, ciphers="ALL")
160            s.connect(remote)
161            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
162                                cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")
163            s.connect(remote)
164            # Error checking occurs when connecting, because the SSL context
165            # isn't created before.
166            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
167                                cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
168            with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
169                s.connect(remote)
170
171    @test_support.cpython_only
172    def test_refcycle(self):
173        # Issue #7943: an SSL object doesn't create reference cycles with
174        # itself.
175        s = socket.socket(socket.AF_INET)
176        ss = ssl.wrap_socket(s)
177        wr = weakref.ref(ss)
178        del ss
179        self.assertEqual(wr(), None)
180
181    def test_wrapped_unconnected(self):
182        # The _delegate_methods in socket.py are correctly delegated to by an
183        # unconnected SSLSocket, so they will raise a socket.error rather than
184        # something unexpected like TypeError.
185        s = socket.socket(socket.AF_INET)
186        ss = ssl.wrap_socket(s)
187        self.assertRaises(socket.error, ss.recv, 1)
188        self.assertRaises(socket.error, ss.recv_into, bytearray(b'x'))
189        self.assertRaises(socket.error, ss.recvfrom, 1)
190        self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1)
191        self.assertRaises(socket.error, ss.send, b'x')
192        self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
193
194
195class NetworkedTests(unittest.TestCase):
196
197    def test_connect(self):
198        with test_support.transient_internet("svn.python.org"):
199            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
200                                cert_reqs=ssl.CERT_NONE)
201            s.connect(("svn.python.org", 443))
202            c = s.getpeercert()
203            if c:
204                self.fail("Peer cert %s shouldn't be here!")
205            s.close()
206
207            # this should fail because we have no verification certs
208            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
209                                cert_reqs=ssl.CERT_REQUIRED)
210            try:
211                s.connect(("svn.python.org", 443))
212            except ssl.SSLError:
213                pass
214            finally:
215                s.close()
216
217            # this should succeed because we specify the root cert
218            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
219                                cert_reqs=ssl.CERT_REQUIRED,
220                                ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
221            try:
222                s.connect(("svn.python.org", 443))
223            finally:
224                s.close()
225
226    def test_connect_ex(self):
227        # Issue #11326: check connect_ex() implementation
228        with test_support.transient_internet("svn.python.org"):
229            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
230                                cert_reqs=ssl.CERT_REQUIRED,
231                                ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
232            try:
233                self.assertEqual(0, s.connect_ex(("svn.python.org", 443)))
234                self.assertTrue(s.getpeercert())
235            finally:
236                s.close()
237
238    def test_non_blocking_connect_ex(self):
239        # Issue #11326: non-blocking connect_ex() should allow handshake
240        # to proceed after the socket gets ready.
241        with test_support.transient_internet("svn.python.org"):
242            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
243                                cert_reqs=ssl.CERT_REQUIRED,
244                                ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
245                                do_handshake_on_connect=False)
246            try:
247                s.setblocking(False)
248                rc = s.connect_ex(('svn.python.org', 443))
249                # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
250                self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
251                # Wait for connect to finish
252                select.select([], [s], [], 5.0)
253                # Non-blocking handshake
254                while True:
255                    try:
256                        s.do_handshake()
257                        break
258                    except ssl.SSLError as err:
259                        if err.args[0] == ssl.SSL_ERROR_WANT_READ:
260                            select.select([s], [], [], 5.0)
261                        elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
262                            select.select([], [s], [], 5.0)
263                        else:
264                            raise
265                # SSL established
266                self.assertTrue(s.getpeercert())
267            finally:
268                s.close()
269
270    @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
271    def test_makefile_close(self):
272        # Issue #5238: creating a file-like object with makefile() shouldn't
273        # delay closing the underlying "real socket" (here tested with its
274        # file descriptor, hence skipping the test under Windows).
275        with test_support.transient_internet("svn.python.org"):
276            ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
277            ss.connect(("svn.python.org", 443))
278            fd = ss.fileno()
279            f = ss.makefile()
280            f.close()
281            # The fd is still open
282            os.read(fd, 0)
283            # Closing the SSL socket should close the fd too
284            ss.close()
285            gc.collect()
286            with self.assertRaises(OSError) as e:
287                os.read(fd, 0)
288            self.assertEqual(e.exception.errno, errno.EBADF)
289
290    def test_non_blocking_handshake(self):
291        with test_support.transient_internet("svn.python.org"):
292            s = socket.socket(socket.AF_INET)
293            s.connect(("svn.python.org", 443))
294            s.setblocking(False)
295            s = ssl.wrap_socket(s,
296                                cert_reqs=ssl.CERT_NONE,
297                                do_handshake_on_connect=False)
298            count = 0
299            while True:
300                try:
301                    count += 1
302                    s.do_handshake()
303                    break
304                except ssl.SSLError, err:
305                    if err.args[0] == ssl.SSL_ERROR_WANT_READ:
306                        select.select([s], [], [])
307                    elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE:
308                        select.select([], [s], [])
309                    else:
310                        raise
311            s.close()
312            if test_support.verbose:
313                sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
314
315    def test_get_server_certificate(self):
316        with test_support.transient_internet("svn.python.org"):
317            pem = ssl.get_server_certificate(("svn.python.org", 443))
318            if not pem:
319                self.fail("No server certificate on svn.python.org:443!")
320
321            try:
322                pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
323            except ssl.SSLError:
324                #should fail
325                pass
326            else:
327                self.fail("Got server certificate %s for svn.python.org!" % pem)
328
329            pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
330            if not pem:
331                self.fail("No server certificate on svn.python.org:443!")
332            if test_support.verbose:
333                sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem)
334
335    def test_algorithms(self):
336        # Issue #8484: all algorithms should be available when verifying a
337        # certificate.
338        # SHA256 was added in OpenSSL 0.9.8
339        if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
340            self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
341        # NOTE: https://sha256.tbs-internet.com is another possible test host
342        remote = ("sha256.tbs-internet.com", 443)
343        sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
344        with test_support.transient_internet("sha256.tbs-internet.com"):
345            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
346                                cert_reqs=ssl.CERT_REQUIRED,
347                                ca_certs=sha256_cert,)
348            try:
349                s.connect(remote)
350                if test_support.verbose:
351                    sys.stdout.write("\nCipher with %r is %r\n" %
352                                     (remote, s.cipher()))
353                    sys.stdout.write("Certificate is:\n%s\n" %
354                                     pprint.pformat(s.getpeercert()))
355            finally:
356                s.close()
357
358
359try:
360    import threading
361except ImportError:
362    _have_threads = False
363else:
364    _have_threads = True
365
366    class ThreadedEchoServer(threading.Thread):
367
368        class ConnectionHandler(threading.Thread):
369
370            """A mildly complicated class, because we want it to work both
371            with and without the SSL wrapper around the socket connection, so
372            that we can test the STARTTLS functionality."""
373
374            def __init__(self, server, connsock):
375                self.server = server
376                self.running = False
377                self.sock = connsock
378                self.sock.setblocking(1)
379                self.sslconn = None
380                threading.Thread.__init__(self)
381                self.daemon = True
382
383            def show_conn_details(self):
384                if self.server.certreqs == ssl.CERT_REQUIRED:
385                    cert = self.sslconn.getpeercert()
386                    if test_support.verbose and self.server.chatty:
387                        sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
388                    cert_binary = self.sslconn.getpeercert(True)
389                    if test_support.verbose and self.server.chatty:
390                        sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
391                cipher = self.sslconn.cipher()
392                if test_support.verbose and self.server.chatty:
393                    sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
394
395            def wrap_conn(self):
396                try:
397                    self.sslconn = ssl.wrap_socket(self.sock, server_side=True,
398                                                   certfile=self.server.certificate,
399                                                   ssl_version=self.server.protocol,
400                                                   ca_certs=self.server.cacerts,
401                                                   cert_reqs=self.server.certreqs,
402                                                   ciphers=self.server.ciphers)
403                except ssl.SSLError:
404                    # XXX Various errors can have happened here, for example
405                    # a mismatching protocol version, an invalid certificate,
406                    # or a low-level bug. This should be made more discriminating.
407                    if self.server.chatty:
408                        handle_error("\n server:  bad connection attempt from " +
409                                     str(self.sock.getpeername()) + ":\n")
410                    self.close()
411                    self.running = False
412                    self.server.stop()
413                    return False
414                else:
415                    return True
416
417            def read(self):
418                if self.sslconn:
419                    return self.sslconn.read()
420                else:
421                    return self.sock.recv(1024)
422
423            def write(self, bytes):
424                if self.sslconn:
425                    return self.sslconn.write(bytes)
426                else:
427                    return self.sock.send(bytes)
428
429            def close(self):
430                if self.sslconn:
431                    self.sslconn.close()
432                else:
433                    self.sock._sock.close()
434
435            def run(self):
436                self.running = True
437                if not self.server.starttls_server:
438                    if isinstance(self.sock, ssl.SSLSocket):
439                        self.sslconn = self.sock
440                    elif not self.wrap_conn():
441                        return
442                    self.show_conn_details()
443                while self.running:
444                    try:
445                        msg = self.read()
446                        if not msg:
447                            # eof, so quit this handler
448                            self.running = False
449                            self.close()
450                        elif msg.strip() == 'over':
451                            if test_support.verbose and self.server.connectionchatty:
452                                sys.stdout.write(" server: client closed connection\n")
453                            self.close()
454                            return
455                        elif self.server.starttls_server and msg.strip() == 'STARTTLS':
456                            if test_support.verbose and self.server.connectionchatty:
457                                sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
458                            self.write("OK\n")
459                            if not self.wrap_conn():
460                                return
461                        elif self.server.starttls_server and self.sslconn and msg.strip() == 'ENDTLS':
462                            if test_support.verbose and self.server.connectionchatty:
463                                sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
464                            self.write("OK\n")
465                            self.sslconn.unwrap()
466                            self.sslconn = None
467                            if test_support.verbose and self.server.connectionchatty:
468                                sys.stdout.write(" server: connection is now unencrypted...\n")
469                        else:
470                            if (test_support.verbose and
471                                self.server.connectionchatty):
472                                ctype = (self.sslconn and "encrypted") or "unencrypted"
473                                sys.stdout.write(" server: read %s (%s), sending back %s (%s)...\n"
474                                                 % (repr(msg), ctype, repr(msg.lower()), ctype))
475                            self.write(msg.lower())
476                    except ssl.SSLError:
477                        if self.server.chatty:
478                            handle_error("Test server failure:\n")
479                        self.close()
480                        self.running = False
481                        # normally, we'd just stop here, but for the test
482                        # harness, we want to stop the server
483                        self.server.stop()
484
485        def __init__(self, certificate, ssl_version=None,
486                     certreqs=None, cacerts=None,
487                     chatty=True, connectionchatty=False, starttls_server=False,
488                     wrap_accepting_socket=False, ciphers=None):
489
490            if ssl_version is None:
491                ssl_version = ssl.PROTOCOL_TLSv1
492            if certreqs is None:
493                certreqs = ssl.CERT_NONE
494            self.certificate = certificate
495            self.protocol = ssl_version
496            self.certreqs = certreqs
497            self.cacerts = cacerts
498            self.ciphers = ciphers
499            self.chatty = chatty
500            self.connectionchatty = connectionchatty
501            self.starttls_server = starttls_server
502            self.sock = socket.socket()
503            self.flag = None
504            if wrap_accepting_socket:
505                self.sock = ssl.wrap_socket(self.sock, server_side=True,
506                                            certfile=self.certificate,
507                                            cert_reqs = self.certreqs,
508                                            ca_certs = self.cacerts,
509                                            ssl_version = self.protocol,
510                                            ciphers = self.ciphers)
511                if test_support.verbose and self.chatty:
512                    sys.stdout.write(' server:  wrapped server socket as %s\n' % str(self.sock))
513            self.port = test_support.bind_port(self.sock)
514            self.active = False
515            threading.Thread.__init__(self)
516            self.daemon = True
517
518        def start(self, flag=None):
519            self.flag = flag
520            threading.Thread.start(self)
521
522        def run(self):
523            self.sock.settimeout(0.05)
524            self.sock.listen(5)
525            self.active = True
526            if self.flag:
527                # signal an event
528                self.flag.set()
529            while self.active:
530                try:
531                    newconn, connaddr = self.sock.accept()
532                    if test_support.verbose and self.chatty:
533                        sys.stdout.write(' server:  new connection from '
534                                         + str(connaddr) + '\n')
535                    handler = self.ConnectionHandler(self, newconn)
536                    handler.start()
537                except socket.timeout:
538                    pass
539                except KeyboardInterrupt:
540                    self.stop()
541            self.sock.close()
542
543        def stop(self):
544            self.active = False
545
546    class AsyncoreEchoServer(threading.Thread):
547
548        class EchoServer(asyncore.dispatcher):
549
550            class ConnectionHandler(asyncore.dispatcher_with_send):
551
552                def __init__(self, conn, certfile):
553                    asyncore.dispatcher_with_send.__init__(self, conn)
554                    self.socket = ssl.wrap_socket(conn, server_side=True,
555                                                  certfile=certfile,
556                                                  do_handshake_on_connect=False)
557                    self._ssl_accepting = True
558
559                def readable(self):
560                    if isinstance(self.socket, ssl.SSLSocket):
561                        while self.socket.pending() > 0:
562                            self.handle_read_event()
563                    return True
564
565                def _do_ssl_handshake(self):
566                    try:
567                        self.socket.do_handshake()
568                    except ssl.SSLError, err:
569                        if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
570                                           ssl.SSL_ERROR_WANT_WRITE):
571                            return
572                        elif err.args[0] == ssl.SSL_ERROR_EOF:
573                            return self.handle_close()
574                        raise
575                    except socket.error, err:
576                        if err.args[0] == errno.ECONNABORTED:
577                            return self.handle_close()
578                    else:
579                        self._ssl_accepting = False
580
581                def handle_read(self):
582                    if self._ssl_accepting:
583                        self._do_ssl_handshake()
584                    else:
585                        data = self.recv(1024)
586                        if data and data.strip() != 'over':
587                            self.send(data.lower())
588
589                def handle_close(self):
590                    self.close()
591                    if test_support.verbose:
592                        sys.stdout.write(" server:  closed connection %s\n" % self.socket)
593
594                def handle_error(self):
595                    raise
596
597            def __init__(self, certfile):
598                self.certfile = certfile
599                asyncore.dispatcher.__init__(self)
600                self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
601                self.port = test_support.bind_port(self.socket)
602                self.listen(5)
603
604            def handle_accept(self):
605                sock_obj, addr = self.accept()
606                if test_support.verbose:
607                    sys.stdout.write(" server:  new connection from %s:%s\n" %addr)
608                self.ConnectionHandler(sock_obj, self.certfile)
609
610            def handle_error(self):
611                raise
612
613        def __init__(self, certfile):
614            self.flag = None
615            self.active = False
616            self.server = self.EchoServer(certfile)
617            self.port = self.server.port
618            threading.Thread.__init__(self)
619            self.daemon = True
620
621        def __str__(self):
622            return "<%s %s>" % (self.__class__.__name__, self.server)
623
624        def start(self, flag=None):
625            self.flag = flag
626            threading.Thread.start(self)
627
628        def run(self):
629            self.active = True
630            if self.flag:
631                self.flag.set()
632            while self.active:
633                asyncore.loop(0.05)
634
635        def stop(self):
636            self.active = False
637            self.server.close()
638
639    class SocketServerHTTPSServer(threading.Thread):
640
641        class HTTPSServer(HTTPServer):
642
643            def __init__(self, server_address, RequestHandlerClass, certfile):
644                HTTPServer.__init__(self, server_address, RequestHandlerClass)
645                # we assume the certfile contains both private key and certificate
646                self.certfile = certfile
647                self.allow_reuse_address = True
648
649            def __str__(self):
650                return ('<%s %s:%s>' %
651                        (self.__class__.__name__,
652                         self.server_name,
653                         self.server_port))
654
655            def get_request(self):
656                # override this to wrap socket with SSL
657                sock, addr = self.socket.accept()
658                sslconn = ssl.wrap_socket(sock, server_side=True,
659                                          certfile=self.certfile)
660                return sslconn, addr
661
662        class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
663            # need to override translate_path to get a known root,
664            # instead of using os.curdir, since the test could be
665            # run from anywhere
666
667            server_version = "TestHTTPS/1.0"
668
669            root = None
670
671            def translate_path(self, path):
672                """Translate a /-separated PATH to the local filename syntax.
673
674                Components that mean special things to the local file system
675                (e.g. drive or directory names) are ignored.  (XXX They should
676                probably be diagnosed.)
677
678                """
679                # abandon query parameters
680                path = urlparse.urlparse(path)[2]
681                path = os.path.normpath(urllib.unquote(path))
682                words = path.split('/')
683                words = filter(None, words)
684                path = self.root
685                for word in words:
686                    drive, word = os.path.splitdrive(word)
687                    head, word = os.path.split(word)
688                    if word in self.root: continue
689                    path = os.path.join(path, word)
690                return path
691
692            def log_message(self, format, *args):
693
694                # we override this to suppress logging unless "verbose"
695
696                if test_support.verbose:
697                    sys.stdout.write(" server (%s:%d %s):\n   [%s] %s\n" %
698                                     (self.server.server_address,
699                                      self.server.server_port,
700                                      self.request.cipher(),
701                                      self.log_date_time_string(),
702                                      format%args))
703
704
705        def __init__(self, certfile):
706            self.flag = None
707            self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
708            self.server = self.HTTPSServer(
709                (HOST, 0), self.RootedHTTPRequestHandler, certfile)
710            self.port = self.server.server_port
711            threading.Thread.__init__(self)
712            self.daemon = True
713
714        def __str__(self):
715            return "<%s %s>" % (self.__class__.__name__, self.server)
716
717        def start(self, flag=None):
718            self.flag = flag
719            threading.Thread.start(self)
720
721        def run(self):
722            if self.flag:
723                self.flag.set()
724            self.server.serve_forever(0.05)
725
726        def stop(self):
727            self.server.shutdown()
728
729
730    def bad_cert_test(certfile):
731        """
732        Launch a server with CERT_REQUIRED, and check that trying to
733        connect to it with the given client certificate fails.
734        """
735        server = ThreadedEchoServer(CERTFILE,
736                                    certreqs=ssl.CERT_REQUIRED,
737                                    cacerts=CERTFILE, chatty=False)
738        flag = threading.Event()
739        server.start(flag)
740        # wait for it to start
741        flag.wait()
742        # try to connect
743        try:
744            try:
745                s = ssl.wrap_socket(socket.socket(),
746                                    certfile=certfile,
747                                    ssl_version=ssl.PROTOCOL_TLSv1)
748                s.connect((HOST, server.port))
749            except ssl.SSLError, x:
750                if test_support.verbose:
751                    sys.stdout.write("\nSSLError is %s\n" % x[1])
752            except socket.error, x:
753                if test_support.verbose:
754                    sys.stdout.write("\nsocket.error is %s\n" % x[1])
755            else:
756                raise AssertionError("Use of invalid cert should have failed!")
757        finally:
758            server.stop()
759            server.join()
760
761    def server_params_test(certfile, protocol, certreqs, cacertsfile,
762                           client_certfile, client_protocol=None, indata="FOO\n",
763                           ciphers=None, chatty=True, connectionchatty=False,
764                           wrap_accepting_socket=False):
765        """
766        Launch a server, connect a client to it and try various reads
767        and writes.
768        """
769        server = ThreadedEchoServer(certfile,
770                                    certreqs=certreqs,
771                                    ssl_version=protocol,
772                                    cacerts=cacertsfile,
773                                    ciphers=ciphers,
774                                    chatty=chatty,
775                                    connectionchatty=connectionchatty,
776                                    wrap_accepting_socket=wrap_accepting_socket)
777        flag = threading.Event()
778        server.start(flag)
779        # wait for it to start
780        flag.wait()
781        # try to connect
782        if client_protocol is None:
783            client_protocol = protocol
784        try:
785            s = ssl.wrap_socket(socket.socket(),
786                                certfile=client_certfile,
787                                ca_certs=cacertsfile,
788                                ciphers=ciphers,
789                                cert_reqs=certreqs,
790                                ssl_version=client_protocol)
791            s.connect((HOST, server.port))
792            for arg in [indata, bytearray(indata), memoryview(indata)]:
793                if connectionchatty:
794                    if test_support.verbose:
795                        sys.stdout.write(
796                            " client:  sending %s...\n" % (repr(arg)))
797                s.write(arg)
798                outdata = s.read()
799                if connectionchatty:
800                    if test_support.verbose:
801                        sys.stdout.write(" client:  read %s\n" % repr(outdata))
802                if outdata != indata.lower():
803                    raise AssertionError(
804                        "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
805                        % (outdata[:min(len(outdata),20)], len(outdata),
806                           indata[:min(len(indata),20)].lower(), len(indata)))
807            s.write("over\n")
808            if connectionchatty:
809                if test_support.verbose:
810                    sys.stdout.write(" client:  closing connection.\n")
811            s.close()
812        finally:
813            server.stop()
814            server.join()
815
816    def try_protocol_combo(server_protocol,
817                           client_protocol,
818                           expect_success,
819                           certsreqs=None):
820        if certsreqs is None:
821            certsreqs = ssl.CERT_NONE
822        certtype = {
823            ssl.CERT_NONE: "CERT_NONE",
824            ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
825            ssl.CERT_REQUIRED: "CERT_REQUIRED",
826        }[certsreqs]
827        if test_support.verbose:
828            formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
829            sys.stdout.write(formatstr %
830                             (ssl.get_protocol_name(client_protocol),
831                              ssl.get_protocol_name(server_protocol),
832                              certtype))
833        try:
834            # NOTE: we must enable "ALL" ciphers, otherwise an SSLv23 client
835            # will send an SSLv3 hello (rather than SSLv2) starting from
836            # OpenSSL 1.0.0 (see issue #8322).
837            server_params_test(CERTFILE, server_protocol, certsreqs,
838                               CERTFILE, CERTFILE, client_protocol,
839                               ciphers="ALL", chatty=False)
840        # Protocol mismatch can result in either an SSLError, or a
841        # "Connection reset by peer" error.
842        except ssl.SSLError:
843            if expect_success:
844                raise
845        except socket.error as e:
846            if expect_success or e.errno != errno.ECONNRESET:
847                raise
848        else:
849            if not expect_success:
850                raise AssertionError(
851                    "Client protocol %s succeeded with server protocol %s!"
852                    % (ssl.get_protocol_name(client_protocol),
853                       ssl.get_protocol_name(server_protocol)))
854
855
856    class ThreadedTests(unittest.TestCase):
857
858        def test_rude_shutdown(self):
859            """A brutal shutdown of an SSL server should raise an IOError
860            in the client when attempting handshake.
861            """
862            listener_ready = threading.Event()
863            listener_gone = threading.Event()
864
865            s = socket.socket()
866            port = test_support.bind_port(s, HOST)
867
868            # `listener` runs in a thread.  It sits in an accept() until
869            # the main thread connects.  Then it rudely closes the socket,
870            # and sets Event `listener_gone` to let the main thread know
871            # the socket is gone.
872            def listener():
873                s.listen(5)
874                listener_ready.set()
875                s.accept()
876                s.close()
877                listener_gone.set()
878
879            def connector():
880                listener_ready.wait()
881                c = socket.socket()
882                c.connect((HOST, port))
883                listener_gone.wait()
884                try:
885                    ssl_sock = ssl.wrap_socket(c)
886                except IOError:
887                    pass
888                else:
889                    self.fail('connecting to closed SSL socket should have failed')
890
891            t = threading.Thread(target=listener)
892            t.start()
893            try:
894                connector()
895            finally:
896                t.join()
897
898        @skip_if_broken_ubuntu_ssl
899        def test_echo(self):
900            """Basic test of an SSL client connecting to a server"""
901            if test_support.verbose:
902                sys.stdout.write("\n")
903            server_params_test(CERTFILE, ssl.PROTOCOL_TLSv1, ssl.CERT_NONE,
904                               CERTFILE, CERTFILE, ssl.PROTOCOL_TLSv1,
905                               chatty=True, connectionchatty=True)
906
907        def test_getpeercert(self):
908            if test_support.verbose:
909                sys.stdout.write("\n")
910            s2 = socket.socket()
911            server = ThreadedEchoServer(CERTFILE,
912                                        certreqs=ssl.CERT_NONE,
913                                        ssl_version=ssl.PROTOCOL_SSLv23,
914                                        cacerts=CERTFILE,
915                                        chatty=False)
916            flag = threading.Event()
917            server.start(flag)
918            # wait for it to start
919            flag.wait()
920            # try to connect
921            try:
922                s = ssl.wrap_socket(socket.socket(),
923                                    certfile=CERTFILE,
924                                    ca_certs=CERTFILE,
925                                    cert_reqs=ssl.CERT_REQUIRED,
926                                    ssl_version=ssl.PROTOCOL_SSLv23)
927                s.connect((HOST, server.port))
928                cert = s.getpeercert()
929                self.assertTrue(cert, "Can't get peer certificate.")
930                cipher = s.cipher()
931                if test_support.verbose:
932                    sys.stdout.write(pprint.pformat(cert) + '\n')
933                    sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
934                if 'subject' not in cert:
935                    self.fail("No subject field in certificate: %s." %
936                              pprint.pformat(cert))
937                if ((('organizationName', 'Python Software Foundation'),)
938                    not in cert['subject']):
939                    self.fail(
940                        "Missing or invalid 'organizationName' field in certificate subject; "
941                        "should be 'Python Software Foundation'.")
942                s.close()
943            finally:
944                server.stop()
945                server.join()
946
947        def test_empty_cert(self):
948            """Connecting with an empty cert file"""
949            bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
950                                      "nullcert.pem"))
951        def test_malformed_cert(self):
952            """Connecting with a badly formatted certificate (syntax error)"""
953            bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
954                                       "badcert.pem"))
955        def test_nonexisting_cert(self):
956            """Connecting with a non-existing cert file"""
957            bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
958                                       "wrongcert.pem"))
959        def test_malformed_key(self):
960            """Connecting with a badly formatted key (syntax error)"""
961            bad_cert_test(os.path.join(os.path.dirname(__file__) or os.curdir,
962                                       "badkey.pem"))
963
964        @skip_if_broken_ubuntu_ssl
965        def test_protocol_sslv2(self):
966            """Connecting to an SSLv2 server with various client options"""
967            if test_support.verbose:
968                sys.stdout.write("\n")
969            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
970            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
971            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
972            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
973            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
974            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
975
976        @skip_if_broken_ubuntu_ssl
977        def test_protocol_sslv23(self):
978            """Connecting to an SSLv23 server with various client options"""
979            if test_support.verbose:
980                sys.stdout.write("\n")
981            try:
982                try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
983            except (ssl.SSLError, socket.error), x:
984                # this fails on some older versions of OpenSSL (0.9.7l, for instance)
985                if test_support.verbose:
986                    sys.stdout.write(
987                        " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
988                        % str(x))
989            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True)
990            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
991            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True)
992
993            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
994            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
995            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
996
997            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
998            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
999            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
1000
1001        @skip_if_broken_ubuntu_ssl
1002        def test_protocol_sslv3(self):
1003            """Connecting to an SSLv3 server with various client options"""
1004            if test_support.verbose:
1005                sys.stdout.write("\n")
1006            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True)
1007            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_OPTIONAL)
1008            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, True, ssl.CERT_REQUIRED)
1009            if hasattr(ssl, 'PROTOCOL_SSLv2'):
1010                try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
1011            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False)
1012            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
1013
1014        @skip_if_broken_ubuntu_ssl
1015        def test_protocol_tlsv1(self):
1016            """Connecting to a TLSv1 server with various client options"""
1017            if test_support.verbose:
1018                sys.stdout.write("\n")
1019            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True)
1020            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_OPTIONAL)
1021            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, True, ssl.CERT_REQUIRED)
1022            if hasattr(ssl, 'PROTOCOL_SSLv2'):
1023                try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
1024            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
1025            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False)
1026
1027        def test_starttls(self):
1028            """Switching from clear text to encrypted and back again."""
1029            msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4", "ENDTLS", "msg 5", "msg 6")
1030
1031            server = ThreadedEchoServer(CERTFILE,
1032                                        ssl_version=ssl.PROTOCOL_TLSv1,
1033                                        starttls_server=True,
1034                                        chatty=True,
1035                                        connectionchatty=True)
1036            flag = threading.Event()
1037            server.start(flag)
1038            # wait for it to start
1039            flag.wait()
1040            # try to connect
1041            wrapped = False
1042            try:
1043                s = socket.socket()
1044                s.setblocking(1)
1045                s.connect((HOST, server.port))
1046                if test_support.verbose:
1047                    sys.stdout.write("\n")
1048                for indata in msgs:
1049                    if test_support.verbose:
1050                        sys.stdout.write(
1051                            " client:  sending %s...\n" % repr(indata))
1052                    if wrapped:
1053                        conn.write(indata)
1054                        outdata = conn.read()
1055                    else:
1056                        s.send(indata)
1057                        outdata = s.recv(1024)
1058                    if (indata == "STARTTLS" and
1059                        outdata.strip().lower().startswith("ok")):
1060                        # STARTTLS ok, switch to secure mode
1061                        if test_support.verbose:
1062                            sys.stdout.write(
1063                                " client:  read %s from server, starting TLS...\n"
1064                                % repr(outdata))
1065                        conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
1066                        wrapped = True
1067                    elif (indata == "ENDTLS" and
1068                        outdata.strip().lower().startswith("ok")):
1069                        # ENDTLS ok, switch back to clear text
1070                        if test_support.verbose:
1071                            sys.stdout.write(
1072                                " client:  read %s from server, ending TLS...\n"
1073                                % repr(outdata))
1074                        s = conn.unwrap()
1075                        wrapped = False
1076                    else:
1077                        if test_support.verbose:
1078                            sys.stdout.write(
1079                                " client:  read %s from server\n" % repr(outdata))
1080                if test_support.verbose:
1081                    sys.stdout.write(" client:  closing connection.\n")
1082                if wrapped:
1083                    conn.write("over\n")
1084                else:
1085                    s.send("over\n")
1086                s.close()
1087            finally:
1088                server.stop()
1089                server.join()
1090
1091        def test_socketserver(self):
1092            """Using a SocketServer to create and manage SSL connections."""
1093            server = SocketServerHTTPSServer(CERTFILE)
1094            flag = threading.Event()
1095            server.start(flag)
1096            # wait for it to start
1097            flag.wait()
1098            # try to connect
1099            try:
1100                if test_support.verbose:
1101                    sys.stdout.write('\n')
1102                with open(CERTFILE, 'rb') as f:
1103                    d1 = f.read()
1104                d2 = ''
1105                # now fetch the same data from the HTTPS server
1106                url = 'https://127.0.0.1:%d/%s' % (
1107                    server.port, os.path.split(CERTFILE)[1])
1108                with test_support.check_py3k_warnings():
1109                    f = urllib.urlopen(url)
1110                dlen = f.info().getheader("content-length")
1111                if dlen and (int(dlen) > 0):
1112                    d2 = f.read(int(dlen))
1113                    if test_support.verbose:
1114                        sys.stdout.write(
1115                            " client: read %d bytes from remote server '%s'\n"
1116                            % (len(d2), server))
1117                f.close()
1118                self.assertEqual(d1, d2)
1119            finally:
1120                server.stop()
1121                server.join()
1122
1123        def test_wrapped_accept(self):
1124            """Check the accept() method on SSL sockets."""
1125            if test_support.verbose:
1126                sys.stdout.write("\n")
1127            server_params_test(CERTFILE, ssl.PROTOCOL_SSLv23, ssl.CERT_REQUIRED,
1128                               CERTFILE, CERTFILE, ssl.PROTOCOL_SSLv23,
1129                               chatty=True, connectionchatty=True,
1130                               wrap_accepting_socket=True)
1131
1132        def test_asyncore_server(self):
1133            """Check the example asyncore integration."""
1134            indata = "TEST MESSAGE of mixed case\n"
1135
1136            if test_support.verbose:
1137                sys.stdout.write("\n")
1138            server = AsyncoreEchoServer(CERTFILE)
1139            flag = threading.Event()
1140            server.start(flag)
1141            # wait for it to start
1142            flag.wait()
1143            # try to connect
1144            try:
1145                s = ssl.wrap_socket(socket.socket())
1146                s.connect(('127.0.0.1', server.port))
1147                if test_support.verbose:
1148                    sys.stdout.write(
1149                        " client:  sending %s...\n" % (repr(indata)))
1150                s.write(indata)
1151                outdata = s.read()
1152                if test_support.verbose:
1153                    sys.stdout.write(" client:  read %s\n" % repr(outdata))
1154                if outdata != indata.lower():
1155                    self.fail(
1156                        "bad data <<%s>> (%d) received; expected <<%s>> (%d)\n"
1157                        % (outdata[:min(len(outdata),20)], len(outdata),
1158                           indata[:min(len(indata),20)].lower(), len(indata)))
1159                s.write("over\n")
1160                if test_support.verbose:
1161                    sys.stdout.write(" client:  closing connection.\n")
1162                s.close()
1163            finally:
1164                server.stop()
1165                # wait for server thread to end
1166                server.join()
1167
1168        def test_recv_send(self):
1169            """Test recv(), send() and friends."""
1170            if test_support.verbose:
1171                sys.stdout.write("\n")
1172
1173            server = ThreadedEchoServer(CERTFILE,
1174                                        certreqs=ssl.CERT_NONE,
1175                                        ssl_version=ssl.PROTOCOL_TLSv1,
1176                                        cacerts=CERTFILE,
1177                                        chatty=True,
1178                                        connectionchatty=False)
1179            flag = threading.Event()
1180            server.start(flag)
1181            # wait for it to start
1182            flag.wait()
1183            # try to connect
1184            s = ssl.wrap_socket(socket.socket(),
1185                                server_side=False,
1186                                certfile=CERTFILE,
1187                                ca_certs=CERTFILE,
1188                                cert_reqs=ssl.CERT_NONE,
1189                                ssl_version=ssl.PROTOCOL_TLSv1)
1190            s.connect((HOST, server.port))
1191            try:
1192                # helper methods for standardising recv* method signatures
1193                def _recv_into():
1194                    b = bytearray("\0"*100)
1195                    count = s.recv_into(b)
1196                    return b[:count]
1197
1198                def _recvfrom_into():
1199                    b = bytearray("\0"*100)
1200                    count, addr = s.recvfrom_into(b)
1201                    return b[:count]
1202
1203                # (name, method, whether to expect success, *args)
1204                send_methods = [
1205                    ('send', s.send, True, []),
1206                    ('sendto', s.sendto, False, ["some.address"]),
1207                    ('sendall', s.sendall, True, []),
1208                ]
1209                recv_methods = [
1210                    ('recv', s.recv, True, []),
1211                    ('recvfrom', s.recvfrom, False, ["some.address"]),
1212                    ('recv_into', _recv_into, True, []),
1213                    ('recvfrom_into', _recvfrom_into, False, []),
1214                ]
1215                data_prefix = u"PREFIX_"
1216
1217                for meth_name, send_meth, expect_success, args in send_methods:
1218                    indata = data_prefix + meth_name
1219                    try:
1220                        send_meth(indata.encode('ASCII', 'strict'), *args)
1221                        outdata = s.read()
1222                        outdata = outdata.decode('ASCII', 'strict')
1223                        if outdata != indata.lower():
1224                            self.fail(
1225                                "While sending with <<%s>> bad data "
1226                                "<<%r>> (%d) received; "
1227                                "expected <<%r>> (%d)\n" % (
1228                                    meth_name, outdata[:20], len(outdata),
1229                                    indata[:20], len(indata)
1230                                )
1231                            )
1232                    except ValueError as e:
1233                        if expect_success:
1234                            self.fail(
1235                                "Failed to send with method <<%s>>; "
1236                                "expected to succeed.\n" % (meth_name,)
1237                            )
1238                        if not str(e).startswith(meth_name):
1239                            self.fail(
1240                                "Method <<%s>> failed with unexpected "
1241                                "exception message: %s\n" % (
1242                                    meth_name, e
1243                                )
1244                            )
1245
1246                for meth_name, recv_meth, expect_success, args in recv_methods:
1247                    indata = data_prefix + meth_name
1248                    try:
1249                        s.send(indata.encode('ASCII', 'strict'))
1250                        outdata = recv_meth(*args)
1251                        outdata = outdata.decode('ASCII', 'strict')
1252                        if outdata != indata.lower():
1253                            self.fail(
1254                                "While receiving with <<%s>> bad data "
1255                                "<<%r>> (%d) received; "
1256                                "expected <<%r>> (%d)\n" % (
1257                                    meth_name, outdata[:20], len(outdata),
1258                                    indata[:20], len(indata)
1259                                )
1260                            )
1261                    except ValueError as e:
1262                        if expect_success:
1263                            self.fail(
1264                                "Failed to receive with method <<%s>>; "
1265                                "expected to succeed.\n" % (meth_name,)
1266                            )
1267                        if not str(e).startswith(meth_name):
1268                            self.fail(
1269                                "Method <<%s>> failed with unexpected "
1270                                "exception message: %s\n" % (
1271                                    meth_name, e
1272                                )
1273                            )
1274                        # consume data
1275                        s.read()
1276
1277                s.write("over\n".encode("ASCII", "strict"))
1278                s.close()
1279            finally:
1280                server.stop()
1281                server.join()
1282
1283        def test_handshake_timeout(self):
1284            # Issue #5103: SSL handshake must respect the socket timeout
1285            server = socket.socket(socket.AF_INET)
1286            host = "127.0.0.1"
1287            port = test_support.bind_port(server)
1288            started = threading.Event()
1289            finish = False
1290
1291            def serve():
1292                server.listen(5)
1293                started.set()
1294                conns = []
1295                while not finish:
1296                    r, w, e = select.select([server], [], [], 0.1)
1297                    if server in r:
1298                        # Let the socket hang around rather than having
1299                        # it closed by garbage collection.
1300                        conns.append(server.accept()[0])
1301
1302            t = threading.Thread(target=serve)
1303            t.start()
1304            started.wait()
1305
1306            try:
1307                try:
1308                    c = socket.socket(socket.AF_INET)
1309                    c.settimeout(0.2)
1310                    c.connect((host, port))
1311                    # Will attempt handshake and time out
1312                    self.assertRaisesRegexp(ssl.SSLError, "timed out",
1313                                            ssl.wrap_socket, c)
1314                finally:
1315                    c.close()
1316                try:
1317                    c = socket.socket(socket.AF_INET)
1318                    c.settimeout(0.2)
1319                    c = ssl.wrap_socket(c)
1320                    # Will attempt handshake and time out
1321                    self.assertRaisesRegexp(ssl.SSLError, "timed out",
1322                                            c.connect, (host, port))
1323                finally:
1324                    c.close()
1325            finally:
1326                finish = True
1327                t.join()
1328                server.close()
1329
1330
1331def test_main(verbose=False):
1332    global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT
1333    CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
1334                            "keycert.pem")
1335    SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
1336        os.path.dirname(__file__) or os.curdir,
1337        "https_svn_python_org_root.pem")
1338
1339    if (not os.path.exists(CERTFILE) or
1340        not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)):
1341        raise test_support.TestFailed("Can't read certificate files!")
1342
1343    tests = [BasicTests, BasicSocketTests]
1344
1345    if test_support.is_resource_enabled('network'):
1346        tests.append(NetworkedTests)
1347
1348    if _have_threads:
1349        thread_info = test_support.threading_setup()
1350        if thread_info and test_support.is_resource_enabled('network'):
1351            tests.append(ThreadedTests)
1352
1353    try:
1354        test_support.run_unittest(*tests)
1355    finally:
1356        if _have_threads:
1357            test_support.threading_cleanup(*thread_info)
1358
1359if __name__ == "__main__":
1360    test_main()
1361