• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# -*- coding: utf-8 -*-
2# Test the support for SSL and sockets
3
4import sys
5import unittest
6from test import test_support as support
7from test.script_helper import assert_python_ok
8import asyncore
9import socket
10import select
11import time
12import datetime
13import gc
14import os
15import errno
16import pprint
17import tempfile
18import urllib2
19import traceback
20import weakref
21import platform
22import functools
23from contextlib import closing
24
25ssl = support.import_module("ssl")
26
27PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
28HOST = support.HOST
29IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
30IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
31
32
33def data_file(*name):
34    return os.path.join(os.path.dirname(__file__), *name)
35
36# The custom key and certificate files used in test_ssl are generated
37# using Lib/test/make_ssl_certs.py.
38# Other certificates are simply fetched from the Internet servers they
39# are meant to authenticate.
40
41CERTFILE = data_file("keycert.pem")
42BYTES_CERTFILE = CERTFILE.encode(sys.getfilesystemencoding())
43ONLYCERT = data_file("ssl_cert.pem")
44ONLYKEY = data_file("ssl_key.pem")
45BYTES_ONLYCERT = ONLYCERT.encode(sys.getfilesystemencoding())
46BYTES_ONLYKEY = ONLYKEY.encode(sys.getfilesystemencoding())
47CERTFILE_PROTECTED = data_file("keycert.passwd.pem")
48ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem")
49KEY_PASSWORD = "somepass"
50CAPATH = data_file("capath")
51BYTES_CAPATH = CAPATH.encode(sys.getfilesystemencoding())
52CAFILE_NEURONIO = data_file("capath", "4e1295a3.0")
53CAFILE_CACERT = data_file("capath", "5ed36f99.0")
54
55
56# empty CRL
57CRLFILE = data_file("revocation.crl")
58
59# Two keys and certs signed by the same CA (for SNI tests)
60SIGNED_CERTFILE = data_file("keycert3.pem")
61SIGNED_CERTFILE2 = data_file("keycert4.pem")
62SIGNING_CA = data_file("pycacert.pem")
63# cert with all kinds of subject alt names
64ALLSANFILE = data_file("allsans.pem")
65
66REMOTE_HOST = "self-signed.pythontest.net"
67REMOTE_ROOT_CERT = data_file("selfsigned_pythontestdotnet.pem")
68
69EMPTYCERT = data_file("nullcert.pem")
70BADCERT = data_file("badcert.pem")
71NONEXISTINGCERT = data_file("XXXnonexisting.pem")
72BADKEY = data_file("badkey.pem")
73NOKIACERT = data_file("nokia.pem")
74NULLBYTECERT = data_file("nullbytecert.pem")
75
76DHFILE = data_file("dh1024.pem")
77BYTES_DHFILE = DHFILE.encode(sys.getfilesystemencoding())
78
79
80def handle_error(prefix):
81    exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
82    if support.verbose:
83        sys.stdout.write(prefix + exc_format)
84
85
86class BasicTests(unittest.TestCase):
87
88    def test_sslwrap_simple(self):
89        # A crude test for the legacy API
90        try:
91            ssl.sslwrap_simple(socket.socket(socket.AF_INET))
92        except IOError, e:
93            if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
94                pass
95            else:
96                raise
97        try:
98            ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock)
99        except IOError, e:
100            if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that
101                pass
102            else:
103                raise
104
105
106def can_clear_options():
107    # 0.9.8m or higher
108    return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15)
109
110def no_sslv2_implies_sslv3_hello():
111    # 0.9.7h or higher
112    return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15)
113
114def have_verify_flags():
115    # 0.9.8 or higher
116    return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15)
117
118def utc_offset(): #NOTE: ignore issues like #1647654
119    # local time = utc time + utc offset
120    if time.daylight and time.localtime().tm_isdst > 0:
121        return -time.altzone  # seconds
122    return -time.timezone
123
124def asn1time(cert_time):
125    # Some versions of OpenSSL ignore seconds, see #18207
126    # 0.9.8.i
127    if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15):
128        fmt = "%b %d %H:%M:%S %Y GMT"
129        dt = datetime.datetime.strptime(cert_time, fmt)
130        dt = dt.replace(second=0)
131        cert_time = dt.strftime(fmt)
132        # %d adds leading zero but ASN1_TIME_print() uses leading space
133        if cert_time[4] == "0":
134            cert_time = cert_time[:4] + " " + cert_time[5:]
135
136    return cert_time
137
138# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2
139def skip_if_broken_ubuntu_ssl(func):
140    if hasattr(ssl, 'PROTOCOL_SSLv2'):
141        @functools.wraps(func)
142        def f(*args, **kwargs):
143            try:
144                ssl.SSLContext(ssl.PROTOCOL_SSLv2)
145            except ssl.SSLError:
146                if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and
147                    platform.linux_distribution() == ('debian', 'squeeze/sid', '')):
148                    raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour")
149            return func(*args, **kwargs)
150        return f
151    else:
152        return func
153
154needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
155
156
157class BasicSocketTests(unittest.TestCase):
158
159    def test_constants(self):
160        ssl.CERT_NONE
161        ssl.CERT_OPTIONAL
162        ssl.CERT_REQUIRED
163        ssl.OP_CIPHER_SERVER_PREFERENCE
164        ssl.OP_SINGLE_DH_USE
165        if ssl.HAS_ECDH:
166            ssl.OP_SINGLE_ECDH_USE
167        if ssl.OPENSSL_VERSION_INFO >= (1, 0):
168            ssl.OP_NO_COMPRESSION
169        self.assertIn(ssl.HAS_SNI, {True, False})
170        self.assertIn(ssl.HAS_ECDH, {True, False})
171
172    def test_random(self):
173        v = ssl.RAND_status()
174        if support.verbose:
175            sys.stdout.write("\n RAND_status is %d (%s)\n"
176                             % (v, (v and "sufficient randomness") or
177                                "insufficient randomness"))
178        if hasattr(ssl, 'RAND_egd'):
179            self.assertRaises(TypeError, ssl.RAND_egd, 1)
180            self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
181        ssl.RAND_add("this is a random string", 75.0)
182
183    def test_parse_cert(self):
184        # note that this uses an 'unofficial' function in _ssl.c,
185        # provided solely for this test, to exercise the certificate
186        # parsing code
187        p = ssl._ssl._test_decode_cert(CERTFILE)
188        if support.verbose:
189            sys.stdout.write("\n" + pprint.pformat(p) + "\n")
190        self.assertEqual(p['issuer'],
191                         ((('countryName', 'XY'),),
192                          (('localityName', 'Castle Anthrax'),),
193                          (('organizationName', 'Python Software Foundation'),),
194                          (('commonName', 'localhost'),))
195                        )
196        # Note the next three asserts will fail if the keys are regenerated
197        self.assertEqual(p['notAfter'], asn1time('Oct  5 23:01:56 2020 GMT'))
198        self.assertEqual(p['notBefore'], asn1time('Oct  8 23:01:56 2010 GMT'))
199        self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E')
200        self.assertEqual(p['subject'],
201                         ((('countryName', 'XY'),),
202                          (('localityName', 'Castle Anthrax'),),
203                          (('organizationName', 'Python Software Foundation'),),
204                          (('commonName', 'localhost'),))
205                        )
206        self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),))
207        # Issue #13034: the subjectAltName in some certificates
208        # (notably projects.developer.nokia.com:443) wasn't parsed
209        p = ssl._ssl._test_decode_cert(NOKIACERT)
210        if support.verbose:
211            sys.stdout.write("\n" + pprint.pformat(p) + "\n")
212        self.assertEqual(p['subjectAltName'],
213                         (('DNS', 'projects.developer.nokia.com'),
214                          ('DNS', 'projects.forum.nokia.com'))
215                        )
216        # extra OCSP and AIA fields
217        self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',))
218        self.assertEqual(p['caIssuers'],
219                         ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',))
220        self.assertEqual(p['crlDistributionPoints'],
221                         ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',))
222
223    def test_parse_cert_CVE_2013_4238(self):
224        p = ssl._ssl._test_decode_cert(NULLBYTECERT)
225        if support.verbose:
226            sys.stdout.write("\n" + pprint.pformat(p) + "\n")
227        subject = ((('countryName', 'US'),),
228                   (('stateOrProvinceName', 'Oregon'),),
229                   (('localityName', 'Beaverton'),),
230                   (('organizationName', 'Python Software Foundation'),),
231                   (('organizationalUnitName', 'Python Core Development'),),
232                   (('commonName', 'null.python.org\x00example.org'),),
233                   (('emailAddress', 'python-dev@python.org'),))
234        self.assertEqual(p['subject'], subject)
235        self.assertEqual(p['issuer'], subject)
236        if ssl._OPENSSL_API_VERSION >= (0, 9, 8):
237            san = (('DNS', 'altnull.python.org\x00example.com'),
238                   ('email', 'null@python.org\x00user@example.org'),
239                   ('URI', 'http://null.python.org\x00http://example.org'),
240                   ('IP Address', '192.0.2.1'),
241                   ('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
242        else:
243            # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
244            san = (('DNS', 'altnull.python.org\x00example.com'),
245                   ('email', 'null@python.org\x00user@example.org'),
246                   ('URI', 'http://null.python.org\x00http://example.org'),
247                   ('IP Address', '192.0.2.1'),
248                   ('IP Address', '<invalid>'))
249
250        self.assertEqual(p['subjectAltName'], san)
251
252    def test_parse_all_sans(self):
253        p = ssl._ssl._test_decode_cert(ALLSANFILE)
254        self.assertEqual(p['subjectAltName'],
255            (
256                ('DNS', 'allsans'),
257                ('othername', '<unsupported>'),
258                ('othername', '<unsupported>'),
259                ('email', 'user@example.org'),
260                ('DNS', 'www.example.org'),
261                ('DirName',
262                    ((('countryName', 'XY'),),
263                    (('localityName', 'Castle Anthrax'),),
264                    (('organizationName', 'Python Software Foundation'),),
265                    (('commonName', 'dirname example'),))),
266                ('URI', 'https://www.python.org/'),
267                ('IP Address', '127.0.0.1'),
268                ('IP Address', '0:0:0:0:0:0:0:1\n'),
269                ('Registered ID', '1.2.3.4.5')
270            )
271        )
272
273    def test_DER_to_PEM(self):
274        with open(CAFILE_CACERT, 'r') as f:
275            pem = f.read()
276        d1 = ssl.PEM_cert_to_DER_cert(pem)
277        p2 = ssl.DER_cert_to_PEM_cert(d1)
278        d2 = ssl.PEM_cert_to_DER_cert(p2)
279        self.assertEqual(d1, d2)
280        if not p2.startswith(ssl.PEM_HEADER + '\n'):
281            self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2)
282        if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'):
283            self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2)
284
285    def test_openssl_version(self):
286        n = ssl.OPENSSL_VERSION_NUMBER
287        t = ssl.OPENSSL_VERSION_INFO
288        s = ssl.OPENSSL_VERSION
289        self.assertIsInstance(n, (int, long))
290        self.assertIsInstance(t, tuple)
291        self.assertIsInstance(s, str)
292        # Some sanity checks follow
293        # >= 0.9
294        self.assertGreaterEqual(n, 0x900000)
295        # < 3.0
296        self.assertLess(n, 0x30000000)
297        major, minor, fix, patch, status = t
298        self.assertGreaterEqual(major, 0)
299        self.assertLess(major, 3)
300        self.assertGreaterEqual(minor, 0)
301        self.assertLess(minor, 256)
302        self.assertGreaterEqual(fix, 0)
303        self.assertLess(fix, 256)
304        self.assertGreaterEqual(patch, 0)
305        self.assertLessEqual(patch, 63)
306        self.assertGreaterEqual(status, 0)
307        self.assertLessEqual(status, 15)
308        # Version string as returned by {Open,Libre}SSL, the format might change
309        if IS_LIBRESSL:
310            self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
311                            (s, t, hex(n)))
312        else:
313            self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
314                            (s, t))
315
316    @support.cpython_only
317    def test_refcycle(self):
318        # Issue #7943: an SSL object doesn't create reference cycles with
319        # itself.
320        s = socket.socket(socket.AF_INET)
321        ss = ssl.wrap_socket(s)
322        wr = weakref.ref(ss)
323        del ss
324        self.assertEqual(wr(), None)
325
326    def test_wrapped_unconnected(self):
327        # Methods on an unconnected SSLSocket propagate the original
328        # socket.error raise by the underlying socket object.
329        s = socket.socket(socket.AF_INET)
330        with closing(ssl.wrap_socket(s)) as ss:
331            self.assertRaises(socket.error, ss.recv, 1)
332            self.assertRaises(socket.error, ss.recv_into, bytearray(b'x'))
333            self.assertRaises(socket.error, ss.recvfrom, 1)
334            self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1)
335            self.assertRaises(socket.error, ss.send, b'x')
336            self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
337
338    def test_timeout(self):
339        # Issue #8524: when creating an SSL socket, the timeout of the
340        # original socket should be retained.
341        for timeout in (None, 0.0, 5.0):
342            s = socket.socket(socket.AF_INET)
343            s.settimeout(timeout)
344            with closing(ssl.wrap_socket(s)) as ss:
345                self.assertEqual(timeout, ss.gettimeout())
346
347    def test_errors(self):
348        sock = socket.socket()
349        self.assertRaisesRegexp(ValueError,
350                        "certfile must be specified",
351                        ssl.wrap_socket, sock, keyfile=CERTFILE)
352        self.assertRaisesRegexp(ValueError,
353                        "certfile must be specified for server-side operations",
354                        ssl.wrap_socket, sock, server_side=True)
355        self.assertRaisesRegexp(ValueError,
356                        "certfile must be specified for server-side operations",
357                        ssl.wrap_socket, sock, server_side=True, certfile="")
358        with closing(ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE)) as s:
359            self.assertRaisesRegexp(ValueError, "can't connect in server-side mode",
360                                    s.connect, (HOST, 8080))
361        with self.assertRaises(IOError) as cm:
362            with closing(socket.socket()) as sock:
363                ssl.wrap_socket(sock, certfile=NONEXISTINGCERT)
364        self.assertEqual(cm.exception.errno, errno.ENOENT)
365        with self.assertRaises(IOError) as cm:
366            with closing(socket.socket()) as sock:
367                ssl.wrap_socket(sock,
368                    certfile=CERTFILE, keyfile=NONEXISTINGCERT)
369        self.assertEqual(cm.exception.errno, errno.ENOENT)
370        with self.assertRaises(IOError) as cm:
371            with closing(socket.socket()) as sock:
372                ssl.wrap_socket(sock,
373                    certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT)
374        self.assertEqual(cm.exception.errno, errno.ENOENT)
375
376    def bad_cert_test(self, certfile):
377        """Check that trying to use the given client certificate fails"""
378        certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
379                                   certfile)
380        sock = socket.socket()
381        self.addCleanup(sock.close)
382        with self.assertRaises(ssl.SSLError):
383            ssl.wrap_socket(sock,
384                            certfile=certfile,
385                            ssl_version=ssl.PROTOCOL_TLSv1)
386
387    def test_empty_cert(self):
388        """Wrapping with an empty cert file"""
389        self.bad_cert_test("nullcert.pem")
390
391    def test_malformed_cert(self):
392        """Wrapping with a badly formatted certificate (syntax error)"""
393        self.bad_cert_test("badcert.pem")
394
395    def test_malformed_key(self):
396        """Wrapping with a badly formatted key (syntax error)"""
397        self.bad_cert_test("badkey.pem")
398
399    def test_match_hostname(self):
400        def ok(cert, hostname):
401            ssl.match_hostname(cert, hostname)
402        def fail(cert, hostname):
403            self.assertRaises(ssl.CertificateError,
404                              ssl.match_hostname, cert, hostname)
405
406        cert = {'subject': ((('commonName', 'example.com'),),)}
407        ok(cert, 'example.com')
408        ok(cert, 'ExAmple.cOm')
409        fail(cert, 'www.example.com')
410        fail(cert, '.example.com')
411        fail(cert, 'example.org')
412        fail(cert, 'exampleXcom')
413
414        cert = {'subject': ((('commonName', '*.a.com'),),)}
415        ok(cert, 'foo.a.com')
416        fail(cert, 'bar.foo.a.com')
417        fail(cert, 'a.com')
418        fail(cert, 'Xa.com')
419        fail(cert, '.a.com')
420
421        # only match one left-most wildcard
422        cert = {'subject': ((('commonName', 'f*.com'),),)}
423        ok(cert, 'foo.com')
424        ok(cert, 'f.com')
425        fail(cert, 'bar.com')
426        fail(cert, 'foo.a.com')
427        fail(cert, 'bar.foo.com')
428
429        # NULL bytes are bad, CVE-2013-4073
430        cert = {'subject': ((('commonName',
431                              'null.python.org\x00example.org'),),)}
432        ok(cert, 'null.python.org\x00example.org') # or raise an error?
433        fail(cert, 'example.org')
434        fail(cert, 'null.python.org')
435
436        # error cases with wildcards
437        cert = {'subject': ((('commonName', '*.*.a.com'),),)}
438        fail(cert, 'bar.foo.a.com')
439        fail(cert, 'a.com')
440        fail(cert, 'Xa.com')
441        fail(cert, '.a.com')
442
443        cert = {'subject': ((('commonName', 'a.*.com'),),)}
444        fail(cert, 'a.foo.com')
445        fail(cert, 'a..com')
446        fail(cert, 'a.com')
447
448        # wildcard doesn't match IDNA prefix 'xn--'
449        idna = u'püthon.python.org'.encode("idna").decode("ascii")
450        cert = {'subject': ((('commonName', idna),),)}
451        ok(cert, idna)
452        cert = {'subject': ((('commonName', 'x*.python.org'),),)}
453        fail(cert, idna)
454        cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)}
455        fail(cert, idna)
456
457        # wildcard in first fragment and  IDNA A-labels in sequent fragments
458        # are supported.
459        idna = u'www*.pythön.org'.encode("idna").decode("ascii")
460        cert = {'subject': ((('commonName', idna),),)}
461        ok(cert, u'www.pythön.org'.encode("idna").decode("ascii"))
462        ok(cert, u'www1.pythön.org'.encode("idna").decode("ascii"))
463        fail(cert, u'ftp.pythön.org'.encode("idna").decode("ascii"))
464        fail(cert, u'pythön.org'.encode("idna").decode("ascii"))
465
466        # Slightly fake real-world example
467        cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT',
468                'subject': ((('commonName', 'linuxfrz.org'),),),
469                'subjectAltName': (('DNS', 'linuxfr.org'),
470                                   ('DNS', 'linuxfr.com'),
471                                   ('othername', '<unsupported>'))}
472        ok(cert, 'linuxfr.org')
473        ok(cert, 'linuxfr.com')
474        # Not a "DNS" entry
475        fail(cert, '<unsupported>')
476        # When there is a subjectAltName, commonName isn't used
477        fail(cert, 'linuxfrz.org')
478
479        # A pristine real-world example
480        cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
481                'subject': ((('countryName', 'US'),),
482                            (('stateOrProvinceName', 'California'),),
483                            (('localityName', 'Mountain View'),),
484                            (('organizationName', 'Google Inc'),),
485                            (('commonName', 'mail.google.com'),))}
486        ok(cert, 'mail.google.com')
487        fail(cert, 'gmail.com')
488        # Only commonName is considered
489        fail(cert, 'California')
490
491        # Neither commonName nor subjectAltName
492        cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT',
493                'subject': ((('countryName', 'US'),),
494                            (('stateOrProvinceName', 'California'),),
495                            (('localityName', 'Mountain View'),),
496                            (('organizationName', 'Google Inc'),))}
497        fail(cert, 'mail.google.com')
498
499        # No DNS entry in subjectAltName but a commonName
500        cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
501                'subject': ((('countryName', 'US'),),
502                            (('stateOrProvinceName', 'California'),),
503                            (('localityName', 'Mountain View'),),
504                            (('commonName', 'mail.google.com'),)),
505                'subjectAltName': (('othername', 'blabla'), )}
506        ok(cert, 'mail.google.com')
507
508        # No DNS entry subjectAltName and no commonName
509        cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT',
510                'subject': ((('countryName', 'US'),),
511                            (('stateOrProvinceName', 'California'),),
512                            (('localityName', 'Mountain View'),),
513                            (('organizationName', 'Google Inc'),)),
514                'subjectAltName': (('othername', 'blabla'),)}
515        fail(cert, 'google.com')
516
517        # Empty cert / no cert
518        self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com')
519        self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com')
520
521        # Issue #17980: avoid denials of service by refusing more than one
522        # wildcard per fragment.
523        cert = {'subject': ((('commonName', 'a*b.com'),),)}
524        ok(cert, 'axxb.com')
525        cert = {'subject': ((('commonName', 'a*b.co*'),),)}
526        fail(cert, 'axxb.com')
527        cert = {'subject': ((('commonName', 'a*b*.com'),),)}
528        with self.assertRaises(ssl.CertificateError) as cm:
529            ssl.match_hostname(cert, 'axxbxxc.com')
530        self.assertIn("too many wildcards", str(cm.exception))
531
532    def test_server_side(self):
533        # server_hostname doesn't work for server sockets
534        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
535        with closing(socket.socket()) as sock:
536            self.assertRaises(ValueError, ctx.wrap_socket, sock, True,
537                              server_hostname="some.hostname")
538
539    def test_unknown_channel_binding(self):
540        # should raise ValueError for unknown type
541        s = socket.socket(socket.AF_INET)
542        with closing(ssl.wrap_socket(s)) as ss:
543            with self.assertRaises(ValueError):
544                ss.get_channel_binding("unknown-type")
545
546    @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
547                         "'tls-unique' channel binding not available")
548    def test_tls_unique_channel_binding(self):
549        # unconnected should return None for known type
550        s = socket.socket(socket.AF_INET)
551        with closing(ssl.wrap_socket(s)) as ss:
552            self.assertIsNone(ss.get_channel_binding("tls-unique"))
553        # the same for server-side
554        s = socket.socket(socket.AF_INET)
555        with closing(ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)) as ss:
556            self.assertIsNone(ss.get_channel_binding("tls-unique"))
557
558    def test_get_default_verify_paths(self):
559        paths = ssl.get_default_verify_paths()
560        self.assertEqual(len(paths), 6)
561        self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
562
563        with support.EnvironmentVarGuard() as env:
564            env["SSL_CERT_DIR"] = CAPATH
565            env["SSL_CERT_FILE"] = CERTFILE
566            paths = ssl.get_default_verify_paths()
567            self.assertEqual(paths.cafile, CERTFILE)
568            self.assertEqual(paths.capath, CAPATH)
569
570    @unittest.skipUnless(sys.platform == "win32", "Windows specific")
571    def test_enum_certificates(self):
572        self.assertTrue(ssl.enum_certificates("CA"))
573        self.assertTrue(ssl.enum_certificates("ROOT"))
574
575        self.assertRaises(TypeError, ssl.enum_certificates)
576        self.assertRaises(WindowsError, ssl.enum_certificates, "")
577
578        trust_oids = set()
579        for storename in ("CA", "ROOT"):
580            store = ssl.enum_certificates(storename)
581            self.assertIsInstance(store, list)
582            for element in store:
583                self.assertIsInstance(element, tuple)
584                self.assertEqual(len(element), 3)
585                cert, enc, trust = element
586                self.assertIsInstance(cert, bytes)
587                self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
588                self.assertIsInstance(trust, (set, bool))
589                if isinstance(trust, set):
590                    trust_oids.update(trust)
591
592        serverAuth = "1.3.6.1.5.5.7.3.1"
593        self.assertIn(serverAuth, trust_oids)
594
595    @unittest.skipUnless(sys.platform == "win32", "Windows specific")
596    def test_enum_crls(self):
597        self.assertTrue(ssl.enum_crls("CA"))
598        self.assertRaises(TypeError, ssl.enum_crls)
599        self.assertRaises(WindowsError, ssl.enum_crls, "")
600
601        crls = ssl.enum_crls("CA")
602        self.assertIsInstance(crls, list)
603        for element in crls:
604            self.assertIsInstance(element, tuple)
605            self.assertEqual(len(element), 2)
606            self.assertIsInstance(element[0], bytes)
607            self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
608
609
610    def test_asn1object(self):
611        expected = (129, 'serverAuth', 'TLS Web Server Authentication',
612                    '1.3.6.1.5.5.7.3.1')
613
614        val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
615        self.assertEqual(val, expected)
616        self.assertEqual(val.nid, 129)
617        self.assertEqual(val.shortname, 'serverAuth')
618        self.assertEqual(val.longname, 'TLS Web Server Authentication')
619        self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
620        self.assertIsInstance(val, ssl._ASN1Object)
621        self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
622
623        val = ssl._ASN1Object.fromnid(129)
624        self.assertEqual(val, expected)
625        self.assertIsInstance(val, ssl._ASN1Object)
626        self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
627        with self.assertRaisesRegexp(ValueError, "unknown NID 100000"):
628            ssl._ASN1Object.fromnid(100000)
629        for i in range(1000):
630            try:
631                obj = ssl._ASN1Object.fromnid(i)
632            except ValueError:
633                pass
634            else:
635                self.assertIsInstance(obj.nid, int)
636                self.assertIsInstance(obj.shortname, str)
637                self.assertIsInstance(obj.longname, str)
638                self.assertIsInstance(obj.oid, (str, type(None)))
639
640        val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
641        self.assertEqual(val, expected)
642        self.assertIsInstance(val, ssl._ASN1Object)
643        self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
644        self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
645                         expected)
646        with self.assertRaisesRegexp(ValueError, "unknown object 'serverauth'"):
647            ssl._ASN1Object.fromname('serverauth')
648
649    def test_purpose_enum(self):
650        val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
651        self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
652        self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
653        self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
654        self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
655        self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
656                              '1.3.6.1.5.5.7.3.1')
657
658        val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
659        self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
660        self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
661        self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
662        self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
663        self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
664                              '1.3.6.1.5.5.7.3.2')
665
666    def test_unsupported_dtls(self):
667        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
668        self.addCleanup(s.close)
669        with self.assertRaises(NotImplementedError) as cx:
670            ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
671        self.assertEqual(str(cx.exception), "only stream sockets are supported")
672        ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
673        with self.assertRaises(NotImplementedError) as cx:
674            ctx.wrap_socket(s)
675        self.assertEqual(str(cx.exception), "only stream sockets are supported")
676
677    def cert_time_ok(self, timestring, timestamp):
678        self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp)
679
680    def cert_time_fail(self, timestring):
681        with self.assertRaises(ValueError):
682            ssl.cert_time_to_seconds(timestring)
683
684    @unittest.skipUnless(utc_offset(),
685                         'local time needs to be different from UTC')
686    def test_cert_time_to_seconds_timezone(self):
687        # Issue #19940: ssl.cert_time_to_seconds() returns wrong
688        #               results if local timezone is not UTC
689        self.cert_time_ok("May  9 00:00:00 2007 GMT", 1178668800.0)
690        self.cert_time_ok("Jan  5 09:34:43 2018 GMT", 1515144883.0)
691
692    def test_cert_time_to_seconds(self):
693        timestring = "Jan  5 09:34:43 2018 GMT"
694        ts = 1515144883.0
695        self.cert_time_ok(timestring, ts)
696        # accept keyword parameter, assert its name
697        self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts)
698        # accept both %e and %d (space or zero generated by strftime)
699        self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts)
700        # case-insensitive
701        self.cert_time_ok("JaN  5 09:34:43 2018 GmT", ts)
702        self.cert_time_fail("Jan  5 09:34 2018 GMT")     # no seconds
703        self.cert_time_fail("Jan  5 09:34:43 2018")      # no GMT
704        self.cert_time_fail("Jan  5 09:34:43 2018 UTC")  # not GMT timezone
705        self.cert_time_fail("Jan 35 09:34:43 2018 GMT")  # invalid day
706        self.cert_time_fail("Jon  5 09:34:43 2018 GMT")  # invalid month
707        self.cert_time_fail("Jan  5 24:00:00 2018 GMT")  # invalid hour
708        self.cert_time_fail("Jan  5 09:60:43 2018 GMT")  # invalid minute
709
710        newyear_ts = 1230768000.0
711        # leap seconds
712        self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts)
713        # same timestamp
714        self.cert_time_ok("Jan  1 00:00:00 2009 GMT", newyear_ts)
715
716        self.cert_time_ok("Jan  5 09:34:59 2018 GMT", 1515144899)
717        #  allow 60th second (even if it is not a leap second)
718        self.cert_time_ok("Jan  5 09:34:60 2018 GMT", 1515144900)
719        #  allow 2nd leap second for compatibility with time.strptime()
720        self.cert_time_ok("Jan  5 09:34:61 2018 GMT", 1515144901)
721        self.cert_time_fail("Jan  5 09:34:62 2018 GMT")  # invalid seconds
722
723        # no special treatement for the special value:
724        #   99991231235959Z (rfc 5280)
725        self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0)
726
727    @support.run_with_locale('LC_ALL', '')
728    def test_cert_time_to_seconds_locale(self):
729        # `cert_time_to_seconds()` should be locale independent
730
731        def local_february_name():
732            return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0))
733
734        if local_february_name().lower() == 'feb':
735            self.skipTest("locale-specific month name needs to be "
736                          "different from C locale")
737
738        # locale-independent
739        self.cert_time_ok("Feb  9 00:00:00 2007 GMT", 1170979200.0)
740        self.cert_time_fail(local_february_name() + "  9 00:00:00 2007 GMT")
741
742
743class ContextTests(unittest.TestCase):
744
745    @skip_if_broken_ubuntu_ssl
746    def test_constructor(self):
747        for protocol in PROTOCOLS:
748            ssl.SSLContext(protocol)
749        self.assertRaises(TypeError, ssl.SSLContext)
750        self.assertRaises(ValueError, ssl.SSLContext, -1)
751        self.assertRaises(ValueError, ssl.SSLContext, 42)
752
753    @skip_if_broken_ubuntu_ssl
754    def test_protocol(self):
755        for proto in PROTOCOLS:
756            ctx = ssl.SSLContext(proto)
757            self.assertEqual(ctx.protocol, proto)
758
759    def test_ciphers(self):
760        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
761        ctx.set_ciphers("ALL")
762        ctx.set_ciphers("DEFAULT")
763        with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
764            ctx.set_ciphers("^$:,;?*'dorothyx")
765
766    @skip_if_broken_ubuntu_ssl
767    def test_options(self):
768        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
769        # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
770        default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
771        if not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0):
772            default |= ssl.OP_NO_COMPRESSION
773        self.assertEqual(default, ctx.options)
774        ctx.options |= ssl.OP_NO_TLSv1
775        self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
776        if can_clear_options():
777            ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
778            self.assertEqual(default, ctx.options)
779            ctx.options = 0
780            self.assertEqual(0, ctx.options)
781        else:
782            with self.assertRaises(ValueError):
783                ctx.options = 0
784
785    def test_verify_mode(self):
786        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
787        # Default value
788        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
789        ctx.verify_mode = ssl.CERT_OPTIONAL
790        self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL)
791        ctx.verify_mode = ssl.CERT_REQUIRED
792        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
793        ctx.verify_mode = ssl.CERT_NONE
794        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
795        with self.assertRaises(TypeError):
796            ctx.verify_mode = None
797        with self.assertRaises(ValueError):
798            ctx.verify_mode = 42
799
800    @unittest.skipUnless(have_verify_flags(),
801                         "verify_flags need OpenSSL > 0.9.8")
802    def test_verify_flags(self):
803        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
804        # default value
805        tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
806        self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf)
807        ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF
808        self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF)
809        ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN
810        self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN)
811        ctx.verify_flags = ssl.VERIFY_DEFAULT
812        self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT)
813        # supports any value
814        ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT
815        self.assertEqual(ctx.verify_flags,
816                         ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT)
817        with self.assertRaises(TypeError):
818            ctx.verify_flags = None
819
820    def test_load_cert_chain(self):
821        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
822        # Combined key and cert in a single file
823        ctx.load_cert_chain(CERTFILE, keyfile=None)
824        ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE)
825        self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE)
826        with self.assertRaises(IOError) as cm:
827            ctx.load_cert_chain(NONEXISTINGCERT)
828        self.assertEqual(cm.exception.errno, errno.ENOENT)
829        with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
830            ctx.load_cert_chain(BADCERT)
831        with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
832            ctx.load_cert_chain(EMPTYCERT)
833        # Separate key and cert
834        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
835        ctx.load_cert_chain(ONLYCERT, ONLYKEY)
836        ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY)
837        ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY)
838        with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
839            ctx.load_cert_chain(ONLYCERT)
840        with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
841            ctx.load_cert_chain(ONLYKEY)
842        with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
843            ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT)
844        # Mismatching key and cert
845        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
846        with self.assertRaisesRegexp(ssl.SSLError, "key values mismatch"):
847            ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY)
848        # Password protected key and cert
849        ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD)
850        ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode())
851        ctx.load_cert_chain(CERTFILE_PROTECTED,
852                            password=bytearray(KEY_PASSWORD.encode()))
853        ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD)
854        ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode())
855        ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED,
856                            bytearray(KEY_PASSWORD.encode()))
857        with self.assertRaisesRegexp(TypeError, "should be a string"):
858            ctx.load_cert_chain(CERTFILE_PROTECTED, password=True)
859        with self.assertRaises(ssl.SSLError):
860            ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass")
861        with self.assertRaisesRegexp(ValueError, "cannot be longer"):
862            # openssl has a fixed limit on the password buffer.
863            # PEM_BUFSIZE is generally set to 1kb.
864            # Return a string larger than this.
865            ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400)
866        # Password callback
867        def getpass_unicode():
868            return KEY_PASSWORD
869        def getpass_bytes():
870            return KEY_PASSWORD.encode()
871        def getpass_bytearray():
872            return bytearray(KEY_PASSWORD.encode())
873        def getpass_badpass():
874            return "badpass"
875        def getpass_huge():
876            return b'a' * (1024 * 1024)
877        def getpass_bad_type():
878            return 9
879        def getpass_exception():
880            raise Exception('getpass error')
881        class GetPassCallable:
882            def __call__(self):
883                return KEY_PASSWORD
884            def getpass(self):
885                return KEY_PASSWORD
886        ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode)
887        ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes)
888        ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray)
889        ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable())
890        ctx.load_cert_chain(CERTFILE_PROTECTED,
891                            password=GetPassCallable().getpass)
892        with self.assertRaises(ssl.SSLError):
893            ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass)
894        with self.assertRaisesRegexp(ValueError, "cannot be longer"):
895            ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge)
896        with self.assertRaisesRegexp(TypeError, "must return a string"):
897            ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type)
898        with self.assertRaisesRegexp(Exception, "getpass error"):
899            ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception)
900        # Make sure the password function isn't called if it isn't needed
901        ctx.load_cert_chain(CERTFILE, password=getpass_exception)
902
903    def test_load_verify_locations(self):
904        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
905        ctx.load_verify_locations(CERTFILE)
906        ctx.load_verify_locations(cafile=CERTFILE, capath=None)
907        ctx.load_verify_locations(BYTES_CERTFILE)
908        ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None)
909        ctx.load_verify_locations(cafile=BYTES_CERTFILE.decode('utf-8'))
910        self.assertRaises(TypeError, ctx.load_verify_locations)
911        self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None)
912        with self.assertRaises(IOError) as cm:
913            ctx.load_verify_locations(NONEXISTINGCERT)
914        self.assertEqual(cm.exception.errno, errno.ENOENT)
915        with self.assertRaises(IOError):
916            ctx.load_verify_locations(u'')
917        with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"):
918            ctx.load_verify_locations(BADCERT)
919        ctx.load_verify_locations(CERTFILE, CAPATH)
920        ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH)
921
922        # Issue #10989: crash if the second argument type is invalid
923        self.assertRaises(TypeError, ctx.load_verify_locations, None, True)
924
925    def test_load_verify_cadata(self):
926        # test cadata
927        with open(CAFILE_CACERT) as f:
928            cacert_pem = f.read().decode("ascii")
929        cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem)
930        with open(CAFILE_NEURONIO) as f:
931            neuronio_pem = f.read().decode("ascii")
932        neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem)
933
934        # test PEM
935        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
936        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0)
937        ctx.load_verify_locations(cadata=cacert_pem)
938        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1)
939        ctx.load_verify_locations(cadata=neuronio_pem)
940        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
941        # cert already in hash table
942        ctx.load_verify_locations(cadata=neuronio_pem)
943        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
944
945        # combined
946        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
947        combined = "\n".join((cacert_pem, neuronio_pem))
948        ctx.load_verify_locations(cadata=combined)
949        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
950
951        # with junk around the certs
952        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
953        combined = ["head", cacert_pem, "other", neuronio_pem, "again",
954                    neuronio_pem, "tail"]
955        ctx.load_verify_locations(cadata="\n".join(combined))
956        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
957
958        # test DER
959        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
960        ctx.load_verify_locations(cadata=cacert_der)
961        ctx.load_verify_locations(cadata=neuronio_der)
962        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
963        # cert already in hash table
964        ctx.load_verify_locations(cadata=cacert_der)
965        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
966
967        # combined
968        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
969        combined = b"".join((cacert_der, neuronio_der))
970        ctx.load_verify_locations(cadata=combined)
971        self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2)
972
973        # error cases
974        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
975        self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object)
976
977        with self.assertRaisesRegexp(ssl.SSLError, "no start line"):
978            ctx.load_verify_locations(cadata=u"broken")
979        with self.assertRaisesRegexp(ssl.SSLError, "not enough data"):
980            ctx.load_verify_locations(cadata=b"broken")
981
982
983    def test_load_dh_params(self):
984        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
985        ctx.load_dh_params(DHFILE)
986        if os.name != 'nt':
987            ctx.load_dh_params(BYTES_DHFILE)
988        self.assertRaises(TypeError, ctx.load_dh_params)
989        self.assertRaises(TypeError, ctx.load_dh_params, None)
990        with self.assertRaises(IOError) as cm:
991            ctx.load_dh_params(NONEXISTINGCERT)
992        self.assertEqual(cm.exception.errno, errno.ENOENT)
993        with self.assertRaises(ssl.SSLError) as cm:
994            ctx.load_dh_params(CERTFILE)
995
996    @skip_if_broken_ubuntu_ssl
997    def test_session_stats(self):
998        for proto in PROTOCOLS:
999            ctx = ssl.SSLContext(proto)
1000            self.assertEqual(ctx.session_stats(), {
1001                'number': 0,
1002                'connect': 0,
1003                'connect_good': 0,
1004                'connect_renegotiate': 0,
1005                'accept': 0,
1006                'accept_good': 0,
1007                'accept_renegotiate': 0,
1008                'hits': 0,
1009                'misses': 0,
1010                'timeouts': 0,
1011                'cache_full': 0,
1012            })
1013
1014    def test_set_default_verify_paths(self):
1015        # There's not much we can do to test that it acts as expected,
1016        # so just check it doesn't crash or raise an exception.
1017        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1018        ctx.set_default_verify_paths()
1019
1020    @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build")
1021    def test_set_ecdh_curve(self):
1022        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1023        ctx.set_ecdh_curve("prime256v1")
1024        ctx.set_ecdh_curve(b"prime256v1")
1025        self.assertRaises(TypeError, ctx.set_ecdh_curve)
1026        self.assertRaises(TypeError, ctx.set_ecdh_curve, None)
1027        self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo")
1028        self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo")
1029
1030    @needs_sni
1031    def test_sni_callback(self):
1032        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1033
1034        # set_servername_callback expects a callable, or None
1035        self.assertRaises(TypeError, ctx.set_servername_callback)
1036        self.assertRaises(TypeError, ctx.set_servername_callback, 4)
1037        self.assertRaises(TypeError, ctx.set_servername_callback, "")
1038        self.assertRaises(TypeError, ctx.set_servername_callback, ctx)
1039
1040        def dummycallback(sock, servername, ctx):
1041            pass
1042        ctx.set_servername_callback(None)
1043        ctx.set_servername_callback(dummycallback)
1044
1045    @needs_sni
1046    def test_sni_callback_refcycle(self):
1047        # Reference cycles through the servername callback are detected
1048        # and cleared.
1049        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1050        def dummycallback(sock, servername, ctx, cycle=ctx):
1051            pass
1052        ctx.set_servername_callback(dummycallback)
1053        wr = weakref.ref(ctx)
1054        del ctx, dummycallback
1055        gc.collect()
1056        self.assertIs(wr(), None)
1057
1058    def test_cert_store_stats(self):
1059        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1060        self.assertEqual(ctx.cert_store_stats(),
1061            {'x509_ca': 0, 'crl': 0, 'x509': 0})
1062        ctx.load_cert_chain(CERTFILE)
1063        self.assertEqual(ctx.cert_store_stats(),
1064            {'x509_ca': 0, 'crl': 0, 'x509': 0})
1065        ctx.load_verify_locations(CERTFILE)
1066        self.assertEqual(ctx.cert_store_stats(),
1067            {'x509_ca': 0, 'crl': 0, 'x509': 1})
1068        ctx.load_verify_locations(CAFILE_CACERT)
1069        self.assertEqual(ctx.cert_store_stats(),
1070            {'x509_ca': 1, 'crl': 0, 'x509': 2})
1071
1072    def test_get_ca_certs(self):
1073        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1074        self.assertEqual(ctx.get_ca_certs(), [])
1075        # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE
1076        ctx.load_verify_locations(CERTFILE)
1077        self.assertEqual(ctx.get_ca_certs(), [])
1078        # but CAFILE_CACERT is a CA cert
1079        ctx.load_verify_locations(CAFILE_CACERT)
1080        self.assertEqual(ctx.get_ca_certs(),
1081            [{'issuer': ((('organizationName', 'Root CA'),),
1082                         (('organizationalUnitName', 'http://www.cacert.org'),),
1083                         (('commonName', 'CA Cert Signing Authority'),),
1084                         (('emailAddress', 'support@cacert.org'),)),
1085              'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'),
1086              'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'),
1087              'serialNumber': '00',
1088              'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',),
1089              'subject': ((('organizationName', 'Root CA'),),
1090                          (('organizationalUnitName', 'http://www.cacert.org'),),
1091                          (('commonName', 'CA Cert Signing Authority'),),
1092                          (('emailAddress', 'support@cacert.org'),)),
1093              'version': 3}])
1094
1095        with open(CAFILE_CACERT) as f:
1096            pem = f.read()
1097        der = ssl.PEM_cert_to_DER_cert(pem)
1098        self.assertEqual(ctx.get_ca_certs(True), [der])
1099
1100    def test_load_default_certs(self):
1101        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1102        ctx.load_default_certs()
1103
1104        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1105        ctx.load_default_certs(ssl.Purpose.SERVER_AUTH)
1106        ctx.load_default_certs()
1107
1108        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1109        ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH)
1110
1111        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1112        self.assertRaises(TypeError, ctx.load_default_certs, None)
1113        self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
1114
1115    @unittest.skipIf(sys.platform == "win32", "not-Windows specific")
1116    @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
1117    def test_load_default_certs_env(self):
1118        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1119        with support.EnvironmentVarGuard() as env:
1120            env["SSL_CERT_DIR"] = CAPATH
1121            env["SSL_CERT_FILE"] = CERTFILE
1122            ctx.load_default_certs()
1123            self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0})
1124
1125    @unittest.skipUnless(sys.platform == "win32", "Windows specific")
1126    def test_load_default_certs_env_windows(self):
1127        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1128        ctx.load_default_certs()
1129        stats = ctx.cert_store_stats()
1130
1131        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1132        with support.EnvironmentVarGuard() as env:
1133            env["SSL_CERT_DIR"] = CAPATH
1134            env["SSL_CERT_FILE"] = CERTFILE
1135            ctx.load_default_certs()
1136            stats["x509"] += 1
1137            self.assertEqual(ctx.cert_store_stats(), stats)
1138
1139    def test_create_default_context(self):
1140        ctx = ssl.create_default_context()
1141        self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1142        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1143        self.assertTrue(ctx.check_hostname)
1144        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1145        self.assertEqual(
1146            ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1147            getattr(ssl, "OP_NO_COMPRESSION", 0),
1148        )
1149
1150        with open(SIGNING_CA) as f:
1151            cadata = f.read().decode("ascii")
1152        ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
1153                                         cadata=cadata)
1154        self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1155        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1156        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1157        self.assertEqual(
1158            ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1159            getattr(ssl, "OP_NO_COMPRESSION", 0),
1160        )
1161
1162        ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
1163        self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1164        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1165        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1166        self.assertEqual(
1167            ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
1168            getattr(ssl, "OP_NO_COMPRESSION", 0),
1169        )
1170        self.assertEqual(
1171            ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
1172            getattr(ssl, "OP_SINGLE_DH_USE", 0),
1173        )
1174        self.assertEqual(
1175            ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1176            getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
1177        )
1178
1179    def test__create_stdlib_context(self):
1180        ctx = ssl._create_stdlib_context()
1181        self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1182        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1183        self.assertFalse(ctx.check_hostname)
1184        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1185
1186        ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
1187        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1188        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1189        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1190
1191        ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
1192                                         cert_reqs=ssl.CERT_REQUIRED,
1193                                         check_hostname=True)
1194        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
1195        self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
1196        self.assertTrue(ctx.check_hostname)
1197        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1198
1199        ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
1200        self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
1201        self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
1202        self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
1203
1204    def test__https_verify_certificates(self):
1205        # Unit test to check the contect factory mapping
1206        # The factories themselves are tested above
1207        # This test will fail by design if run under PYTHONHTTPSVERIFY=0
1208        # (as will various test_httplib tests)
1209
1210        # Uses a fresh SSL module to avoid affecting the real one
1211        local_ssl = support.import_fresh_module("ssl")
1212        # Certificate verification is enabled by default
1213        self.assertIs(local_ssl._create_default_https_context,
1214                      local_ssl.create_default_context)
1215        # Turn default verification off
1216        local_ssl._https_verify_certificates(enable=False)
1217        self.assertIs(local_ssl._create_default_https_context,
1218                      local_ssl._create_unverified_context)
1219        # And back on
1220        local_ssl._https_verify_certificates(enable=True)
1221        self.assertIs(local_ssl._create_default_https_context,
1222                      local_ssl.create_default_context)
1223        # The default behaviour is to enable
1224        local_ssl._https_verify_certificates(enable=False)
1225        local_ssl._https_verify_certificates()
1226        self.assertIs(local_ssl._create_default_https_context,
1227                      local_ssl.create_default_context)
1228
1229    def test__https_verify_envvar(self):
1230        # Unit test to check the PYTHONHTTPSVERIFY handling
1231        # Need to use a subprocess so it can still be run under -E
1232        https_is_verified = """import ssl, sys; \
1233            status = "Error: _create_default_https_context does not verify certs" \
1234                       if ssl._create_default_https_context is \
1235                          ssl._create_unverified_context \
1236                     else None; \
1237            sys.exit(status)"""
1238        https_is_not_verified = """import ssl, sys; \
1239            status = "Error: _create_default_https_context verifies certs" \
1240                       if ssl._create_default_https_context is \
1241                          ssl.create_default_context \
1242                     else None; \
1243            sys.exit(status)"""
1244        extra_env = {}
1245        # Omitting it leaves verification on
1246        assert_python_ok("-c", https_is_verified, **extra_env)
1247        # Setting it to zero turns verification off
1248        extra_env[ssl._https_verify_envvar] = "0"
1249        assert_python_ok("-c", https_is_not_verified, **extra_env)
1250        # Any other value should also leave it on
1251        for setting in ("", "1", "enabled", "foo"):
1252            extra_env[ssl._https_verify_envvar] = setting
1253            assert_python_ok("-c", https_is_verified, **extra_env)
1254
1255    def test_check_hostname(self):
1256        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1257        self.assertFalse(ctx.check_hostname)
1258
1259        # Requires CERT_REQUIRED or CERT_OPTIONAL
1260        with self.assertRaises(ValueError):
1261            ctx.check_hostname = True
1262        ctx.verify_mode = ssl.CERT_REQUIRED
1263        self.assertFalse(ctx.check_hostname)
1264        ctx.check_hostname = True
1265        self.assertTrue(ctx.check_hostname)
1266
1267        ctx.verify_mode = ssl.CERT_OPTIONAL
1268        ctx.check_hostname = True
1269        self.assertTrue(ctx.check_hostname)
1270
1271        # Cannot set CERT_NONE with check_hostname enabled
1272        with self.assertRaises(ValueError):
1273            ctx.verify_mode = ssl.CERT_NONE
1274        ctx.check_hostname = False
1275        self.assertFalse(ctx.check_hostname)
1276
1277
1278class SSLErrorTests(unittest.TestCase):
1279
1280    def test_str(self):
1281        # The str() of a SSLError doesn't include the errno
1282        e = ssl.SSLError(1, "foo")
1283        self.assertEqual(str(e), "foo")
1284        self.assertEqual(e.errno, 1)
1285        # Same for a subclass
1286        e = ssl.SSLZeroReturnError(1, "foo")
1287        self.assertEqual(str(e), "foo")
1288        self.assertEqual(e.errno, 1)
1289
1290    def test_lib_reason(self):
1291        # Test the library and reason attributes
1292        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1293        with self.assertRaises(ssl.SSLError) as cm:
1294            ctx.load_dh_params(CERTFILE)
1295        self.assertEqual(cm.exception.library, 'PEM')
1296        self.assertEqual(cm.exception.reason, 'NO_START_LINE')
1297        s = str(cm.exception)
1298        self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s)
1299
1300    def test_subclass(self):
1301        # Check that the appropriate SSLError subclass is raised
1302        # (this only tests one of them)
1303        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1304        with closing(socket.socket()) as s:
1305            s.bind(("127.0.0.1", 0))
1306            s.listen(5)
1307            c = socket.socket()
1308            c.connect(s.getsockname())
1309            c.setblocking(False)
1310            with closing(ctx.wrap_socket(c, False, do_handshake_on_connect=False)) as c:
1311                with self.assertRaises(ssl.SSLWantReadError) as cm:
1312                    c.do_handshake()
1313                s = str(cm.exception)
1314                self.assertTrue(s.startswith("The operation did not complete (read)"), s)
1315                # For compatibility
1316                self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ)
1317
1318
1319class NetworkedTests(unittest.TestCase):
1320
1321    def test_connect(self):
1322        with support.transient_internet(REMOTE_HOST):
1323            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1324                                cert_reqs=ssl.CERT_NONE)
1325            try:
1326                s.connect((REMOTE_HOST, 443))
1327                self.assertEqual({}, s.getpeercert())
1328            finally:
1329                s.close()
1330
1331            # this should fail because we have no verification certs
1332            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1333                                cert_reqs=ssl.CERT_REQUIRED)
1334            self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1335                                   s.connect, (REMOTE_HOST, 443))
1336            s.close()
1337
1338            # this should succeed because we specify the root cert
1339            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1340                                cert_reqs=ssl.CERT_REQUIRED,
1341                                ca_certs=REMOTE_ROOT_CERT)
1342            try:
1343                s.connect((REMOTE_HOST, 443))
1344                self.assertTrue(s.getpeercert())
1345            finally:
1346                s.close()
1347
1348    def test_connect_ex(self):
1349        # Issue #11326: check connect_ex() implementation
1350        with support.transient_internet(REMOTE_HOST):
1351            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1352                                cert_reqs=ssl.CERT_REQUIRED,
1353                                ca_certs=REMOTE_ROOT_CERT)
1354            try:
1355                self.assertEqual(0, s.connect_ex((REMOTE_HOST, 443)))
1356                self.assertTrue(s.getpeercert())
1357            finally:
1358                s.close()
1359
1360    def test_non_blocking_connect_ex(self):
1361        # Issue #11326: non-blocking connect_ex() should allow handshake
1362        # to proceed after the socket gets ready.
1363        with support.transient_internet(REMOTE_HOST):
1364            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1365                                cert_reqs=ssl.CERT_REQUIRED,
1366                                ca_certs=REMOTE_ROOT_CERT,
1367                                do_handshake_on_connect=False)
1368            try:
1369                s.setblocking(False)
1370                rc = s.connect_ex((REMOTE_HOST, 443))
1371                # EWOULDBLOCK under Windows, EINPROGRESS elsewhere
1372                self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK))
1373                # Wait for connect to finish
1374                select.select([], [s], [], 5.0)
1375                # Non-blocking handshake
1376                while True:
1377                    try:
1378                        s.do_handshake()
1379                        break
1380                    except ssl.SSLWantReadError:
1381                        select.select([s], [], [], 5.0)
1382                    except ssl.SSLWantWriteError:
1383                        select.select([], [s], [], 5.0)
1384                # SSL established
1385                self.assertTrue(s.getpeercert())
1386            finally:
1387                s.close()
1388
1389    def test_timeout_connect_ex(self):
1390        # Issue #12065: on a timeout, connect_ex() should return the original
1391        # errno (mimicking the behaviour of non-SSL sockets).
1392        with support.transient_internet(REMOTE_HOST):
1393            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1394                                cert_reqs=ssl.CERT_REQUIRED,
1395                                ca_certs=REMOTE_ROOT_CERT,
1396                                do_handshake_on_connect=False)
1397            try:
1398                s.settimeout(0.0000001)
1399                rc = s.connect_ex((REMOTE_HOST, 443))
1400                if rc == 0:
1401                    self.skipTest("REMOTE_HOST responded too quickly")
1402                self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
1403            finally:
1404                s.close()
1405
1406    def test_connect_ex_error(self):
1407        with support.transient_internet(REMOTE_HOST):
1408            s = ssl.wrap_socket(socket.socket(socket.AF_INET),
1409                                cert_reqs=ssl.CERT_REQUIRED,
1410                                ca_certs=REMOTE_ROOT_CERT)
1411            try:
1412                rc = s.connect_ex((REMOTE_HOST, 444))
1413                # Issue #19919: Windows machines or VMs hosted on Windows
1414                # machines sometimes return EWOULDBLOCK.
1415                errors = (
1416                    errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT,
1417                    errno.EWOULDBLOCK,
1418                )
1419                self.assertIn(rc, errors)
1420            finally:
1421                s.close()
1422
1423    def test_connect_with_context(self):
1424        with support.transient_internet(REMOTE_HOST):
1425            # Same as test_connect, but with a separately created context
1426            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1427            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1428            s.connect((REMOTE_HOST, 443))
1429            try:
1430                self.assertEqual({}, s.getpeercert())
1431            finally:
1432                s.close()
1433            # Same with a server hostname
1434            s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1435                                server_hostname=REMOTE_HOST)
1436            s.connect((REMOTE_HOST, 443))
1437            s.close()
1438            # This should fail because we have no verification certs
1439            ctx.verify_mode = ssl.CERT_REQUIRED
1440            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1441            self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed",
1442                                    s.connect, (REMOTE_HOST, 443))
1443            s.close()
1444            # This should succeed because we specify the root cert
1445            ctx.load_verify_locations(REMOTE_ROOT_CERT)
1446            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1447            s.connect((REMOTE_HOST, 443))
1448            try:
1449                cert = s.getpeercert()
1450                self.assertTrue(cert)
1451            finally:
1452                s.close()
1453
1454    def test_connect_capath(self):
1455        # Verify server certificates using the `capath` argument
1456        # NOTE: the subject hashing algorithm has been changed between
1457        # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
1458        # contain both versions of each certificate (same content, different
1459        # filename) for this test to be portable across OpenSSL releases.
1460        with support.transient_internet(REMOTE_HOST):
1461            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1462            ctx.verify_mode = ssl.CERT_REQUIRED
1463            ctx.load_verify_locations(capath=CAPATH)
1464            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1465            s.connect((REMOTE_HOST, 443))
1466            try:
1467                cert = s.getpeercert()
1468                self.assertTrue(cert)
1469            finally:
1470                s.close()
1471            # Same with a bytes `capath` argument
1472            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1473            ctx.verify_mode = ssl.CERT_REQUIRED
1474            ctx.load_verify_locations(capath=BYTES_CAPATH)
1475            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1476            s.connect((REMOTE_HOST, 443))
1477            try:
1478                cert = s.getpeercert()
1479                self.assertTrue(cert)
1480            finally:
1481                s.close()
1482
1483    def test_connect_cadata(self):
1484        with open(REMOTE_ROOT_CERT) as f:
1485            pem = f.read().decode('ascii')
1486        der = ssl.PEM_cert_to_DER_cert(pem)
1487        with support.transient_internet(REMOTE_HOST):
1488            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1489            ctx.verify_mode = ssl.CERT_REQUIRED
1490            ctx.load_verify_locations(cadata=pem)
1491            with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1492                s.connect((REMOTE_HOST, 443))
1493                cert = s.getpeercert()
1494                self.assertTrue(cert)
1495
1496            # same with DER
1497            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1498            ctx.verify_mode = ssl.CERT_REQUIRED
1499            ctx.load_verify_locations(cadata=der)
1500            with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s:
1501                s.connect((REMOTE_HOST, 443))
1502                cert = s.getpeercert()
1503                self.assertTrue(cert)
1504
1505    @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
1506    def test_makefile_close(self):
1507        # Issue #5238: creating a file-like object with makefile() shouldn't
1508        # delay closing the underlying "real socket" (here tested with its
1509        # file descriptor, hence skipping the test under Windows).
1510        with support.transient_internet(REMOTE_HOST):
1511            ss = ssl.wrap_socket(socket.socket(socket.AF_INET))
1512            ss.connect((REMOTE_HOST, 443))
1513            fd = ss.fileno()
1514            f = ss.makefile()
1515            f.close()
1516            # The fd is still open
1517            os.read(fd, 0)
1518            # Closing the SSL socket should close the fd too
1519            ss.close()
1520            gc.collect()
1521            with self.assertRaises(OSError) as e:
1522                os.read(fd, 0)
1523            self.assertEqual(e.exception.errno, errno.EBADF)
1524
1525    def test_non_blocking_handshake(self):
1526        with support.transient_internet(REMOTE_HOST):
1527            s = socket.socket(socket.AF_INET)
1528            s.connect((REMOTE_HOST, 443))
1529            s.setblocking(False)
1530            s = ssl.wrap_socket(s,
1531                                cert_reqs=ssl.CERT_NONE,
1532                                do_handshake_on_connect=False)
1533            count = 0
1534            while True:
1535                try:
1536                    count += 1
1537                    s.do_handshake()
1538                    break
1539                except ssl.SSLWantReadError:
1540                    select.select([s], [], [])
1541                except ssl.SSLWantWriteError:
1542                    select.select([], [s], [])
1543            s.close()
1544            if support.verbose:
1545                sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count)
1546
1547    def test_get_server_certificate(self):
1548        def _test_get_server_certificate(host, port, cert=None):
1549            with support.transient_internet(host):
1550                pem = ssl.get_server_certificate((host, port))
1551                if not pem:
1552                    self.fail("No server certificate on %s:%s!" % (host, port))
1553
1554                try:
1555                    pem = ssl.get_server_certificate((host, port),
1556                                                     ca_certs=CERTFILE)
1557                except ssl.SSLError as x:
1558                    #should fail
1559                    if support.verbose:
1560                        sys.stdout.write("%s\n" % x)
1561                else:
1562                    self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
1563                pem = ssl.get_server_certificate((host, port),
1564                                                 ca_certs=cert)
1565                if not pem:
1566                    self.fail("No server certificate on %s:%s!" % (host, port))
1567                if support.verbose:
1568                    sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem))
1569
1570        _test_get_server_certificate(REMOTE_HOST, 443, REMOTE_ROOT_CERT)
1571        if support.IPV6_ENABLED:
1572            _test_get_server_certificate('ipv6.google.com', 443)
1573
1574    def test_ciphers(self):
1575        remote = (REMOTE_HOST, 443)
1576        with support.transient_internet(remote[0]):
1577            with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1578                                         cert_reqs=ssl.CERT_NONE, ciphers="ALL")) as s:
1579                s.connect(remote)
1580            with closing(ssl.wrap_socket(socket.socket(socket.AF_INET),
1581                                         cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")) as s:
1582                s.connect(remote)
1583            # Error checking can happen at instantiation or when connecting
1584            with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
1585                with closing(socket.socket(socket.AF_INET)) as sock:
1586                    s = ssl.wrap_socket(sock,
1587                                        cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx")
1588                    s.connect(remote)
1589
1590    def test_algorithms(self):
1591        # Issue #8484: all algorithms should be available when verifying a
1592        # certificate.
1593        # SHA256 was added in OpenSSL 0.9.8
1594        if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
1595            self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
1596        # sha256.tbs-internet.com needs SNI to use the correct certificate
1597        if not ssl.HAS_SNI:
1598            self.skipTest("SNI needed for this test")
1599        # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host)
1600        remote = ("sha256.tbs-internet.com", 443)
1601        sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
1602        with support.transient_internet("sha256.tbs-internet.com"):
1603            ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1604            ctx.verify_mode = ssl.CERT_REQUIRED
1605            ctx.load_verify_locations(sha256_cert)
1606            s = ctx.wrap_socket(socket.socket(socket.AF_INET),
1607                                server_hostname="sha256.tbs-internet.com")
1608            try:
1609                s.connect(remote)
1610                if support.verbose:
1611                    sys.stdout.write("\nCipher with %r is %r\n" %
1612                                     (remote, s.cipher()))
1613                    sys.stdout.write("Certificate is:\n%s\n" %
1614                                     pprint.pformat(s.getpeercert()))
1615            finally:
1616                s.close()
1617
1618    def test_get_ca_certs_capath(self):
1619        # capath certs are loaded on request
1620        with support.transient_internet(REMOTE_HOST):
1621            ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1622            ctx.verify_mode = ssl.CERT_REQUIRED
1623            ctx.load_verify_locations(capath=CAPATH)
1624            self.assertEqual(ctx.get_ca_certs(), [])
1625            s = ctx.wrap_socket(socket.socket(socket.AF_INET))
1626            s.connect((REMOTE_HOST, 443))
1627            try:
1628                cert = s.getpeercert()
1629                self.assertTrue(cert)
1630            finally:
1631                s.close()
1632            self.assertEqual(len(ctx.get_ca_certs()), 1)
1633
1634    @needs_sni
1635    def test_context_setget(self):
1636        # Check that the context of a connected socket can be replaced.
1637        with support.transient_internet(REMOTE_HOST):
1638            ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
1639            ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
1640            s = socket.socket(socket.AF_INET)
1641            with closing(ctx1.wrap_socket(s)) as ss:
1642                ss.connect((REMOTE_HOST, 443))
1643                self.assertIs(ss.context, ctx1)
1644                self.assertIs(ss._sslobj.context, ctx1)
1645                ss.context = ctx2
1646                self.assertIs(ss.context, ctx2)
1647                self.assertIs(ss._sslobj.context, ctx2)
1648
1649try:
1650    import threading
1651except ImportError:
1652    _have_threads = False
1653else:
1654    _have_threads = True
1655
1656    from test.ssl_servers import make_https_server
1657
1658    class ThreadedEchoServer(threading.Thread):
1659
1660        class ConnectionHandler(threading.Thread):
1661
1662            """A mildly complicated class, because we want it to work both
1663            with and without the SSL wrapper around the socket connection, so
1664            that we can test the STARTTLS functionality."""
1665
1666            def __init__(self, server, connsock, addr):
1667                self.server = server
1668                self.running = False
1669                self.sock = connsock
1670                self.addr = addr
1671                self.sock.setblocking(1)
1672                self.sslconn = None
1673                threading.Thread.__init__(self)
1674                self.daemon = True
1675
1676            def wrap_conn(self):
1677                try:
1678                    self.sslconn = self.server.context.wrap_socket(
1679                        self.sock, server_side=True)
1680                    self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
1681                    self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
1682                except socket.error as e:
1683                    # We treat ConnectionResetError as though it were an
1684                    # SSLError - OpenSSL on Ubuntu abruptly closes the
1685                    # connection when asked to use an unsupported protocol.
1686                    #
1687                    # XXX Various errors can have happened here, for example
1688                    # a mismatching protocol version, an invalid certificate,
1689                    # or a low-level bug. This should be made more discriminating.
1690                    if not isinstance(e, ssl.SSLError) and e.errno != errno.ECONNRESET:
1691                        raise
1692                    self.server.conn_errors.append(e)
1693                    if self.server.chatty:
1694                        handle_error("\n server:  bad connection attempt from " + repr(self.addr) + ":\n")
1695                    self.running = False
1696                    self.server.stop()
1697                    self.close()
1698                    return False
1699                else:
1700                    if self.server.context.verify_mode == ssl.CERT_REQUIRED:
1701                        cert = self.sslconn.getpeercert()
1702                        if support.verbose and self.server.chatty:
1703                            sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n")
1704                        cert_binary = self.sslconn.getpeercert(True)
1705                        if support.verbose and self.server.chatty:
1706                            sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n")
1707                    cipher = self.sslconn.cipher()
1708                    if support.verbose and self.server.chatty:
1709                        sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
1710                        sys.stdout.write(" server: selected protocol is now "
1711                                + str(self.sslconn.selected_npn_protocol()) + "\n")
1712                    return True
1713
1714            def read(self):
1715                if self.sslconn:
1716                    return self.sslconn.read()
1717                else:
1718                    return self.sock.recv(1024)
1719
1720            def write(self, bytes):
1721                if self.sslconn:
1722                    return self.sslconn.write(bytes)
1723                else:
1724                    return self.sock.send(bytes)
1725
1726            def close(self):
1727                if self.sslconn:
1728                    self.sslconn.close()
1729                else:
1730                    self.sock.close()
1731
1732            def run(self):
1733                self.running = True
1734                if not self.server.starttls_server:
1735                    if not self.wrap_conn():
1736                        return
1737                while self.running:
1738                    try:
1739                        msg = self.read()
1740                        stripped = msg.strip()
1741                        if not stripped:
1742                            # eof, so quit this handler
1743                            self.running = False
1744                            self.close()
1745                        elif stripped == b'over':
1746                            if support.verbose and self.server.connectionchatty:
1747                                sys.stdout.write(" server: client closed connection\n")
1748                            self.close()
1749                            return
1750                        elif (self.server.starttls_server and
1751                              stripped == b'STARTTLS'):
1752                            if support.verbose and self.server.connectionchatty:
1753                                sys.stdout.write(" server: read STARTTLS from client, sending OK...\n")
1754                            self.write(b"OK\n")
1755                            if not self.wrap_conn():
1756                                return
1757                        elif (self.server.starttls_server and self.sslconn
1758                              and stripped == b'ENDTLS'):
1759                            if support.verbose and self.server.connectionchatty:
1760                                sys.stdout.write(" server: read ENDTLS from client, sending OK...\n")
1761                            self.write(b"OK\n")
1762                            self.sock = self.sslconn.unwrap()
1763                            self.sslconn = None
1764                            if support.verbose and self.server.connectionchatty:
1765                                sys.stdout.write(" server: connection is now unencrypted...\n")
1766                        elif stripped == b'CB tls-unique':
1767                            if support.verbose and self.server.connectionchatty:
1768                                sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n")
1769                            data = self.sslconn.get_channel_binding("tls-unique")
1770                            self.write(repr(data).encode("us-ascii") + b"\n")
1771                        else:
1772                            if (support.verbose and
1773                                self.server.connectionchatty):
1774                                ctype = (self.sslconn and "encrypted") or "unencrypted"
1775                                sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
1776                                                 % (msg, ctype, msg.lower(), ctype))
1777                            self.write(msg.lower())
1778                    except ssl.SSLError:
1779                        if self.server.chatty:
1780                            handle_error("Test server failure:\n")
1781                        self.close()
1782                        self.running = False
1783                        # normally, we'd just stop here, but for the test
1784                        # harness, we want to stop the server
1785                        self.server.stop()
1786
1787        def __init__(self, certificate=None, ssl_version=None,
1788                     certreqs=None, cacerts=None,
1789                     chatty=True, connectionchatty=False, starttls_server=False,
1790                     npn_protocols=None, alpn_protocols=None,
1791                     ciphers=None, context=None):
1792            if context:
1793                self.context = context
1794            else:
1795                self.context = ssl.SSLContext(ssl_version
1796                                              if ssl_version is not None
1797                                              else ssl.PROTOCOL_TLSv1)
1798                self.context.verify_mode = (certreqs if certreqs is not None
1799                                            else ssl.CERT_NONE)
1800                if cacerts:
1801                    self.context.load_verify_locations(cacerts)
1802                if certificate:
1803                    self.context.load_cert_chain(certificate)
1804                if npn_protocols:
1805                    self.context.set_npn_protocols(npn_protocols)
1806                if alpn_protocols:
1807                    self.context.set_alpn_protocols(alpn_protocols)
1808                if ciphers:
1809                    self.context.set_ciphers(ciphers)
1810            self.chatty = chatty
1811            self.connectionchatty = connectionchatty
1812            self.starttls_server = starttls_server
1813            self.sock = socket.socket()
1814            self.port = support.bind_port(self.sock)
1815            self.flag = None
1816            self.active = False
1817            self.selected_npn_protocols = []
1818            self.selected_alpn_protocols = []
1819            self.conn_errors = []
1820            threading.Thread.__init__(self)
1821            self.daemon = True
1822
1823        def __enter__(self):
1824            self.start(threading.Event())
1825            self.flag.wait()
1826            return self
1827
1828        def __exit__(self, *args):
1829            self.stop()
1830            self.join()
1831
1832        def start(self, flag=None):
1833            self.flag = flag
1834            threading.Thread.start(self)
1835
1836        def run(self):
1837            self.sock.settimeout(0.05)
1838            self.sock.listen(5)
1839            self.active = True
1840            if self.flag:
1841                # signal an event
1842                self.flag.set()
1843            while self.active:
1844                try:
1845                    newconn, connaddr = self.sock.accept()
1846                    if support.verbose and self.chatty:
1847                        sys.stdout.write(' server:  new connection from '
1848                                         + repr(connaddr) + '\n')
1849                    handler = self.ConnectionHandler(self, newconn, connaddr)
1850                    handler.start()
1851                    handler.join()
1852                except socket.timeout:
1853                    pass
1854                except KeyboardInterrupt:
1855                    self.stop()
1856            self.sock.close()
1857
1858        def stop(self):
1859            self.active = False
1860
1861    class AsyncoreEchoServer(threading.Thread):
1862
1863        class EchoServer(asyncore.dispatcher):
1864
1865            class ConnectionHandler(asyncore.dispatcher_with_send):
1866
1867                def __init__(self, conn, certfile):
1868                    self.socket = ssl.wrap_socket(conn, server_side=True,
1869                                                  certfile=certfile,
1870                                                  do_handshake_on_connect=False)
1871                    asyncore.dispatcher_with_send.__init__(self, self.socket)
1872                    self._ssl_accepting = True
1873                    self._do_ssl_handshake()
1874
1875                def readable(self):
1876                    if isinstance(self.socket, ssl.SSLSocket):
1877                        while self.socket.pending() > 0:
1878                            self.handle_read_event()
1879                    return True
1880
1881                def _do_ssl_handshake(self):
1882                    try:
1883                        self.socket.do_handshake()
1884                    except (ssl.SSLWantReadError, ssl.SSLWantWriteError):
1885                        return
1886                    except ssl.SSLEOFError:
1887                        return self.handle_close()
1888                    except ssl.SSLError:
1889                        raise
1890                    except socket.error, err:
1891                        if err.args[0] == errno.ECONNABORTED:
1892                            return self.handle_close()
1893                    else:
1894                        self._ssl_accepting = False
1895
1896                def handle_read(self):
1897                    if self._ssl_accepting:
1898                        self._do_ssl_handshake()
1899                    else:
1900                        data = self.recv(1024)
1901                        if support.verbose:
1902                            sys.stdout.write(" server:  read %s from client\n" % repr(data))
1903                        if not data:
1904                            self.close()
1905                        else:
1906                            self.send(data.lower())
1907
1908                def handle_close(self):
1909                    self.close()
1910                    if support.verbose:
1911                        sys.stdout.write(" server:  closed connection %s\n" % self.socket)
1912
1913                def handle_error(self):
1914                    raise
1915
1916            def __init__(self, certfile):
1917                self.certfile = certfile
1918                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1919                self.port = support.bind_port(sock, '')
1920                asyncore.dispatcher.__init__(self, sock)
1921                self.listen(5)
1922
1923            def handle_accept(self):
1924                sock_obj, addr = self.accept()
1925                if support.verbose:
1926                    sys.stdout.write(" server:  new connection from %s:%s\n" %addr)
1927                self.ConnectionHandler(sock_obj, self.certfile)
1928
1929            def handle_error(self):
1930                raise
1931
1932        def __init__(self, certfile):
1933            self.flag = None
1934            self.active = False
1935            self.server = self.EchoServer(certfile)
1936            self.port = self.server.port
1937            threading.Thread.__init__(self)
1938            self.daemon = True
1939
1940        def __str__(self):
1941            return "<%s %s>" % (self.__class__.__name__, self.server)
1942
1943        def __enter__(self):
1944            self.start(threading.Event())
1945            self.flag.wait()
1946            return self
1947
1948        def __exit__(self, *args):
1949            if support.verbose:
1950                sys.stdout.write(" cleanup: stopping server.\n")
1951            self.stop()
1952            if support.verbose:
1953                sys.stdout.write(" cleanup: joining server thread.\n")
1954            self.join()
1955            if support.verbose:
1956                sys.stdout.write(" cleanup: successfully joined.\n")
1957
1958        def start(self, flag=None):
1959            self.flag = flag
1960            threading.Thread.start(self)
1961
1962        def run(self):
1963            self.active = True
1964            if self.flag:
1965                self.flag.set()
1966            while self.active:
1967                try:
1968                    asyncore.loop(1)
1969                except:
1970                    pass
1971
1972        def stop(self):
1973            self.active = False
1974            self.server.close()
1975
1976    def server_params_test(client_context, server_context, indata=b"FOO\n",
1977                           chatty=True, connectionchatty=False, sni_name=None):
1978        """
1979        Launch a server, connect a client to it and try various reads
1980        and writes.
1981        """
1982        stats = {}
1983        server = ThreadedEchoServer(context=server_context,
1984                                    chatty=chatty,
1985                                    connectionchatty=False)
1986        with server:
1987            with closing(client_context.wrap_socket(socket.socket(),
1988                    server_hostname=sni_name)) as s:
1989                s.connect((HOST, server.port))
1990                for arg in [indata, bytearray(indata), memoryview(indata)]:
1991                    if connectionchatty:
1992                        if support.verbose:
1993                            sys.stdout.write(
1994                                " client:  sending %r...\n" % indata)
1995                    s.write(arg)
1996                    outdata = s.read()
1997                    if connectionchatty:
1998                        if support.verbose:
1999                            sys.stdout.write(" client:  read %r\n" % outdata)
2000                    if outdata != indata.lower():
2001                        raise AssertionError(
2002                            "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2003                            % (outdata[:20], len(outdata),
2004                               indata[:20].lower(), len(indata)))
2005                s.write(b"over\n")
2006                if connectionchatty:
2007                    if support.verbose:
2008                        sys.stdout.write(" client:  closing connection.\n")
2009                stats.update({
2010                    'compression': s.compression(),
2011                    'cipher': s.cipher(),
2012                    'peercert': s.getpeercert(),
2013                    'client_alpn_protocol': s.selected_alpn_protocol(),
2014                    'client_npn_protocol': s.selected_npn_protocol(),
2015                    'version': s.version(),
2016                })
2017                s.close()
2018            stats['server_alpn_protocols'] = server.selected_alpn_protocols
2019            stats['server_npn_protocols'] = server.selected_npn_protocols
2020        return stats
2021
2022    def try_protocol_combo(server_protocol, client_protocol, expect_success,
2023                           certsreqs=None, server_options=0, client_options=0):
2024        """
2025        Try to SSL-connect using *client_protocol* to *server_protocol*.
2026        If *expect_success* is true, assert that the connection succeeds,
2027        if it's false, assert that the connection fails.
2028        Also, if *expect_success* is a string, assert that it is the protocol
2029        version actually used by the connection.
2030        """
2031        if certsreqs is None:
2032            certsreqs = ssl.CERT_NONE
2033        certtype = {
2034            ssl.CERT_NONE: "CERT_NONE",
2035            ssl.CERT_OPTIONAL: "CERT_OPTIONAL",
2036            ssl.CERT_REQUIRED: "CERT_REQUIRED",
2037        }[certsreqs]
2038        if support.verbose:
2039            formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n"
2040            sys.stdout.write(formatstr %
2041                             (ssl.get_protocol_name(client_protocol),
2042                              ssl.get_protocol_name(server_protocol),
2043                              certtype))
2044        client_context = ssl.SSLContext(client_protocol)
2045        client_context.options |= client_options
2046        server_context = ssl.SSLContext(server_protocol)
2047        server_context.options |= server_options
2048
2049        # NOTE: we must enable "ALL" ciphers on the client, otherwise an
2050        # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
2051        # starting from OpenSSL 1.0.0 (see issue #8322).
2052        if client_context.protocol == ssl.PROTOCOL_SSLv23:
2053            client_context.set_ciphers("ALL")
2054
2055        for ctx in (client_context, server_context):
2056            ctx.verify_mode = certsreqs
2057            ctx.load_cert_chain(CERTFILE)
2058            ctx.load_verify_locations(CERTFILE)
2059        try:
2060            stats = server_params_test(client_context, server_context,
2061                                       chatty=False, connectionchatty=False)
2062        # Protocol mismatch can result in either an SSLError, or a
2063        # "Connection reset by peer" error.
2064        except ssl.SSLError:
2065            if expect_success:
2066                raise
2067        except socket.error as e:
2068            if expect_success or e.errno != errno.ECONNRESET:
2069                raise
2070        else:
2071            if not expect_success:
2072                raise AssertionError(
2073                    "Client protocol %s succeeded with server protocol %s!"
2074                    % (ssl.get_protocol_name(client_protocol),
2075                       ssl.get_protocol_name(server_protocol)))
2076            elif (expect_success is not True
2077                  and expect_success != stats['version']):
2078                raise AssertionError("version mismatch: expected %r, got %r"
2079                                     % (expect_success, stats['version']))
2080
2081
2082    class ThreadedTests(unittest.TestCase):
2083
2084        @skip_if_broken_ubuntu_ssl
2085        def test_echo(self):
2086            """Basic test of an SSL client connecting to a server"""
2087            if support.verbose:
2088                sys.stdout.write("\n")
2089            for protocol in PROTOCOLS:
2090                context = ssl.SSLContext(protocol)
2091                context.load_cert_chain(CERTFILE)
2092                server_params_test(context, context,
2093                                   chatty=True, connectionchatty=True)
2094
2095        def test_getpeercert(self):
2096            if support.verbose:
2097                sys.stdout.write("\n")
2098            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2099            context.verify_mode = ssl.CERT_REQUIRED
2100            context.load_verify_locations(CERTFILE)
2101            context.load_cert_chain(CERTFILE)
2102            server = ThreadedEchoServer(context=context, chatty=False)
2103            with server:
2104                s = context.wrap_socket(socket.socket(),
2105                                        do_handshake_on_connect=False)
2106                s.connect((HOST, server.port))
2107                # getpeercert() raise ValueError while the handshake isn't
2108                # done.
2109                with self.assertRaises(ValueError):
2110                    s.getpeercert()
2111                s.do_handshake()
2112                cert = s.getpeercert()
2113                self.assertTrue(cert, "Can't get peer certificate.")
2114                cipher = s.cipher()
2115                if support.verbose:
2116                    sys.stdout.write(pprint.pformat(cert) + '\n')
2117                    sys.stdout.write("Connection cipher is " + str(cipher) + '.\n')
2118                if 'subject' not in cert:
2119                    self.fail("No subject field in certificate: %s." %
2120                              pprint.pformat(cert))
2121                if ((('organizationName', 'Python Software Foundation'),)
2122                    not in cert['subject']):
2123                    self.fail(
2124                        "Missing or invalid 'organizationName' field in certificate subject; "
2125                        "should be 'Python Software Foundation'.")
2126                self.assertIn('notBefore', cert)
2127                self.assertIn('notAfter', cert)
2128                before = ssl.cert_time_to_seconds(cert['notBefore'])
2129                after = ssl.cert_time_to_seconds(cert['notAfter'])
2130                self.assertLess(before, after)
2131                s.close()
2132
2133        @unittest.skipUnless(have_verify_flags(),
2134                            "verify_flags need OpenSSL > 0.9.8")
2135        def test_crl_check(self):
2136            if support.verbose:
2137                sys.stdout.write("\n")
2138
2139            server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2140            server_context.load_cert_chain(SIGNED_CERTFILE)
2141
2142            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2143            context.verify_mode = ssl.CERT_REQUIRED
2144            context.load_verify_locations(SIGNING_CA)
2145            tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0)
2146            self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT | tf)
2147
2148            # VERIFY_DEFAULT should pass
2149            server = ThreadedEchoServer(context=server_context, chatty=True)
2150            with server:
2151                with closing(context.wrap_socket(socket.socket())) as s:
2152                    s.connect((HOST, server.port))
2153                    cert = s.getpeercert()
2154                    self.assertTrue(cert, "Can't get peer certificate.")
2155
2156            # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails
2157            context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
2158
2159            server = ThreadedEchoServer(context=server_context, chatty=True)
2160            with server:
2161                with closing(context.wrap_socket(socket.socket())) as s:
2162                    with self.assertRaisesRegexp(ssl.SSLError,
2163                                                "certificate verify failed"):
2164                        s.connect((HOST, server.port))
2165
2166            # now load a CRL file. The CRL file is signed by the CA.
2167            context.load_verify_locations(CRLFILE)
2168
2169            server = ThreadedEchoServer(context=server_context, chatty=True)
2170            with server:
2171                with closing(context.wrap_socket(socket.socket())) as s:
2172                    s.connect((HOST, server.port))
2173                    cert = s.getpeercert()
2174                    self.assertTrue(cert, "Can't get peer certificate.")
2175
2176        def test_check_hostname(self):
2177            if support.verbose:
2178                sys.stdout.write("\n")
2179
2180            server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2181            server_context.load_cert_chain(SIGNED_CERTFILE)
2182
2183            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2184            context.verify_mode = ssl.CERT_REQUIRED
2185            context.check_hostname = True
2186            context.load_verify_locations(SIGNING_CA)
2187
2188            # correct hostname should verify
2189            server = ThreadedEchoServer(context=server_context, chatty=True)
2190            with server:
2191                with closing(context.wrap_socket(socket.socket(),
2192                                                 server_hostname="localhost")) as s:
2193                    s.connect((HOST, server.port))
2194                    cert = s.getpeercert()
2195                    self.assertTrue(cert, "Can't get peer certificate.")
2196
2197            # incorrect hostname should raise an exception
2198            server = ThreadedEchoServer(context=server_context, chatty=True)
2199            with server:
2200                with closing(context.wrap_socket(socket.socket(),
2201                                                 server_hostname="invalid")) as s:
2202                    with self.assertRaisesRegexp(ssl.CertificateError,
2203                                                "hostname 'invalid' doesn't match u?'localhost'"):
2204                        s.connect((HOST, server.port))
2205
2206            # missing server_hostname arg should cause an exception, too
2207            server = ThreadedEchoServer(context=server_context, chatty=True)
2208            with server:
2209                with closing(socket.socket()) as s:
2210                    with self.assertRaisesRegexp(ValueError,
2211                                                "check_hostname requires server_hostname"):
2212                        context.wrap_socket(s)
2213
2214        def test_wrong_cert(self):
2215            """Connecting when the server rejects the client's certificate
2216
2217            Launch a server with CERT_REQUIRED, and check that trying to
2218            connect to it with a wrong client certificate fails.
2219            """
2220            certfile = os.path.join(os.path.dirname(__file__) or os.curdir,
2221                                       "wrongcert.pem")
2222            server = ThreadedEchoServer(CERTFILE,
2223                                        certreqs=ssl.CERT_REQUIRED,
2224                                        cacerts=CERTFILE, chatty=False,
2225                                        connectionchatty=False)
2226            with server, \
2227                    closing(socket.socket()) as sock, \
2228                    closing(ssl.wrap_socket(sock,
2229                                        certfile=certfile,
2230                                        ssl_version=ssl.PROTOCOL_TLSv1)) as s:
2231                try:
2232                    # Expect either an SSL error about the server rejecting
2233                    # the connection, or a low-level connection reset (which
2234                    # sometimes happens on Windows)
2235                    s.connect((HOST, server.port))
2236                except ssl.SSLError as e:
2237                    if support.verbose:
2238                        sys.stdout.write("\nSSLError is %r\n" % e)
2239                except socket.error as e:
2240                    if e.errno != errno.ECONNRESET:
2241                        raise
2242                    if support.verbose:
2243                        sys.stdout.write("\nsocket.error is %r\n" % e)
2244                else:
2245                    self.fail("Use of invalid cert should have failed!")
2246
2247        def test_rude_shutdown(self):
2248            """A brutal shutdown of an SSL server should raise an OSError
2249            in the client when attempting handshake.
2250            """
2251            listener_ready = threading.Event()
2252            listener_gone = threading.Event()
2253
2254            s = socket.socket()
2255            port = support.bind_port(s, HOST)
2256
2257            # `listener` runs in a thread.  It sits in an accept() until
2258            # the main thread connects.  Then it rudely closes the socket,
2259            # and sets Event `listener_gone` to let the main thread know
2260            # the socket is gone.
2261            def listener():
2262                s.listen(5)
2263                listener_ready.set()
2264                newsock, addr = s.accept()
2265                newsock.close()
2266                s.close()
2267                listener_gone.set()
2268
2269            def connector():
2270                listener_ready.wait()
2271                with closing(socket.socket()) as c:
2272                    c.connect((HOST, port))
2273                    listener_gone.wait()
2274                    try:
2275                        ssl_sock = ssl.wrap_socket(c)
2276                    except socket.error:
2277                        pass
2278                    else:
2279                        self.fail('connecting to closed SSL socket should have failed')
2280
2281            t = threading.Thread(target=listener)
2282            t.start()
2283            try:
2284                connector()
2285            finally:
2286                t.join()
2287
2288        @skip_if_broken_ubuntu_ssl
2289        @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'),
2290                             "OpenSSL is compiled without SSLv2 support")
2291        def test_protocol_sslv2(self):
2292            """Connecting to an SSLv2 server with various client options"""
2293            if support.verbose:
2294                sys.stdout.write("\n")
2295            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
2296            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
2297            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
2298            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
2299            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
2300            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
2301            # SSLv23 client with specific SSL options
2302            if no_sslv2_implies_sslv3_hello():
2303                # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2304                try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2305                                   client_options=ssl.OP_NO_SSLv2)
2306            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2307                               client_options=ssl.OP_NO_SSLv3)
2308            try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False,
2309                               client_options=ssl.OP_NO_TLSv1)
2310
2311        @skip_if_broken_ubuntu_ssl
2312        def test_protocol_sslv23(self):
2313            """Connecting to an SSLv23 server with various client options"""
2314            if support.verbose:
2315                sys.stdout.write("\n")
2316            if hasattr(ssl, 'PROTOCOL_SSLv2'):
2317                try:
2318                    try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True)
2319                except socket.error as x:
2320                    # this fails on some older versions of OpenSSL (0.9.7l, for instance)
2321                    if support.verbose:
2322                        sys.stdout.write(
2323                            " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n"
2324                            % str(x))
2325            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2326                try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False)
2327            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True)
2328            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1')
2329
2330            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2331                try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL)
2332            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL)
2333            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2334
2335            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2336                try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED)
2337            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED)
2338            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
2339
2340            # Server with specific SSL options
2341            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2342                try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False,
2343                               server_options=ssl.OP_NO_SSLv3)
2344            # Will choose TLSv1
2345            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True,
2346                               server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
2347            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False,
2348                               server_options=ssl.OP_NO_TLSv1)
2349
2350
2351        @skip_if_broken_ubuntu_ssl
2352        @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'),
2353                             "OpenSSL is compiled without SSLv3 support")
2354        def test_protocol_sslv3(self):
2355            """Connecting to an SSLv3 server with various client options"""
2356            if support.verbose:
2357                sys.stdout.write("\n")
2358            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3')
2359            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL)
2360            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED)
2361            if hasattr(ssl, 'PROTOCOL_SSLv2'):
2362                try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False)
2363            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False,
2364                               client_options=ssl.OP_NO_SSLv3)
2365            try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False)
2366            if no_sslv2_implies_sslv3_hello():
2367                # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs
2368                try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23,
2369                                   False, client_options=ssl.OP_NO_SSLv2)
2370
2371        @skip_if_broken_ubuntu_ssl
2372        def test_protocol_tlsv1(self):
2373            """Connecting to a TLSv1 server with various client options"""
2374            if support.verbose:
2375                sys.stdout.write("\n")
2376            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1')
2377            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL)
2378            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED)
2379            if hasattr(ssl, 'PROTOCOL_SSLv2'):
2380                try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False)
2381            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2382                try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False)
2383            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False,
2384                               client_options=ssl.OP_NO_TLSv1)
2385
2386        @skip_if_broken_ubuntu_ssl
2387        @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"),
2388                             "TLS version 1.1 not supported.")
2389        def test_protocol_tlsv1_1(self):
2390            """Connecting to a TLSv1.1 server with various client options.
2391               Testing against older TLS versions."""
2392            if support.verbose:
2393                sys.stdout.write("\n")
2394            try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
2395            if hasattr(ssl, 'PROTOCOL_SSLv2'):
2396                try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False)
2397            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2398                try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False)
2399            try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False,
2400                               client_options=ssl.OP_NO_TLSv1_1)
2401
2402            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1')
2403            try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False)
2404            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False)
2405
2406
2407        @skip_if_broken_ubuntu_ssl
2408        @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"),
2409                             "TLS version 1.2 not supported.")
2410        def test_protocol_tlsv1_2(self):
2411            """Connecting to a TLSv1.2 server with various client options.
2412               Testing against older TLS versions."""
2413            if support.verbose:
2414                sys.stdout.write("\n")
2415            try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2',
2416                               server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,
2417                               client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,)
2418            if hasattr(ssl, 'PROTOCOL_SSLv2'):
2419                try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False)
2420            if hasattr(ssl, 'PROTOCOL_SSLv3'):
2421                try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False)
2422            try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False,
2423                               client_options=ssl.OP_NO_TLSv1_2)
2424
2425            try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2')
2426            try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False)
2427            try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False)
2428            try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False)
2429            try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False)
2430
2431        def test_starttls(self):
2432            """Switching from clear text to encrypted and back again."""
2433            msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6")
2434
2435            server = ThreadedEchoServer(CERTFILE,
2436                                        ssl_version=ssl.PROTOCOL_TLSv1,
2437                                        starttls_server=True,
2438                                        chatty=True,
2439                                        connectionchatty=True)
2440            wrapped = False
2441            with server:
2442                s = socket.socket()
2443                s.setblocking(1)
2444                s.connect((HOST, server.port))
2445                if support.verbose:
2446                    sys.stdout.write("\n")
2447                for indata in msgs:
2448                    if support.verbose:
2449                        sys.stdout.write(
2450                            " client:  sending %r...\n" % indata)
2451                    if wrapped:
2452                        conn.write(indata)
2453                        outdata = conn.read()
2454                    else:
2455                        s.send(indata)
2456                        outdata = s.recv(1024)
2457                    msg = outdata.strip().lower()
2458                    if indata == b"STARTTLS" and msg.startswith(b"ok"):
2459                        # STARTTLS ok, switch to secure mode
2460                        if support.verbose:
2461                            sys.stdout.write(
2462                                " client:  read %r from server, starting TLS...\n"
2463                                % msg)
2464                        conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1)
2465                        wrapped = True
2466                    elif indata == b"ENDTLS" and msg.startswith(b"ok"):
2467                        # ENDTLS ok, switch back to clear text
2468                        if support.verbose:
2469                            sys.stdout.write(
2470                                " client:  read %r from server, ending TLS...\n"
2471                                % msg)
2472                        s = conn.unwrap()
2473                        wrapped = False
2474                    else:
2475                        if support.verbose:
2476                            sys.stdout.write(
2477                                " client:  read %r from server\n" % msg)
2478                if support.verbose:
2479                    sys.stdout.write(" client:  closing connection.\n")
2480                if wrapped:
2481                    conn.write(b"over\n")
2482                else:
2483                    s.send(b"over\n")
2484                if wrapped:
2485                    conn.close()
2486                else:
2487                    s.close()
2488
2489        def test_socketserver(self):
2490            """Using a SocketServer to create and manage SSL connections."""
2491            server = make_https_server(self, certfile=CERTFILE)
2492            # try to connect
2493            if support.verbose:
2494                sys.stdout.write('\n')
2495            with open(CERTFILE, 'rb') as f:
2496                d1 = f.read()
2497            d2 = ''
2498            # now fetch the same data from the HTTPS server
2499            url = 'https://localhost:%d/%s' % (
2500                server.port, os.path.split(CERTFILE)[1])
2501            context = ssl.create_default_context(cafile=CERTFILE)
2502            f = urllib2.urlopen(url, context=context)
2503            try:
2504                dlen = f.info().getheader("content-length")
2505                if dlen and (int(dlen) > 0):
2506                    d2 = f.read(int(dlen))
2507                    if support.verbose:
2508                        sys.stdout.write(
2509                            " client: read %d bytes from remote server '%s'\n"
2510                            % (len(d2), server))
2511            finally:
2512                f.close()
2513            self.assertEqual(d1, d2)
2514
2515        def test_asyncore_server(self):
2516            """Check the example asyncore integration."""
2517            if support.verbose:
2518                sys.stdout.write("\n")
2519
2520            indata = b"FOO\n"
2521            server = AsyncoreEchoServer(CERTFILE)
2522            with server:
2523                s = ssl.wrap_socket(socket.socket())
2524                s.connect(('127.0.0.1', server.port))
2525                if support.verbose:
2526                    sys.stdout.write(
2527                        " client:  sending %r...\n" % indata)
2528                s.write(indata)
2529                outdata = s.read()
2530                if support.verbose:
2531                    sys.stdout.write(" client:  read %r\n" % outdata)
2532                if outdata != indata.lower():
2533                    self.fail(
2534                        "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n"
2535                        % (outdata[:20], len(outdata),
2536                           indata[:20].lower(), len(indata)))
2537                s.write(b"over\n")
2538                if support.verbose:
2539                    sys.stdout.write(" client:  closing connection.\n")
2540                s.close()
2541                if support.verbose:
2542                    sys.stdout.write(" client:  connection closed.\n")
2543
2544        def test_recv_send(self):
2545            """Test recv(), send() and friends."""
2546            if support.verbose:
2547                sys.stdout.write("\n")
2548
2549            server = ThreadedEchoServer(CERTFILE,
2550                                        certreqs=ssl.CERT_NONE,
2551                                        ssl_version=ssl.PROTOCOL_TLSv1,
2552                                        cacerts=CERTFILE,
2553                                        chatty=True,
2554                                        connectionchatty=False)
2555            with server:
2556                s = ssl.wrap_socket(socket.socket(),
2557                                    server_side=False,
2558                                    certfile=CERTFILE,
2559                                    ca_certs=CERTFILE,
2560                                    cert_reqs=ssl.CERT_NONE,
2561                                    ssl_version=ssl.PROTOCOL_TLSv1)
2562                s.connect((HOST, server.port))
2563                # helper methods for standardising recv* method signatures
2564                def _recv_into():
2565                    b = bytearray(b"\0"*100)
2566                    count = s.recv_into(b)
2567                    return b[:count]
2568
2569                def _recvfrom_into():
2570                    b = bytearray(b"\0"*100)
2571                    count, addr = s.recvfrom_into(b)
2572                    return b[:count]
2573
2574                # (name, method, whether to expect success, *args)
2575                send_methods = [
2576                    ('send', s.send, True, []),
2577                    ('sendto', s.sendto, False, ["some.address"]),
2578                    ('sendall', s.sendall, True, []),
2579                ]
2580                recv_methods = [
2581                    ('recv', s.recv, True, []),
2582                    ('recvfrom', s.recvfrom, False, ["some.address"]),
2583                    ('recv_into', _recv_into, True, []),
2584                    ('recvfrom_into', _recvfrom_into, False, []),
2585                ]
2586                data_prefix = u"PREFIX_"
2587
2588                for meth_name, send_meth, expect_success, args in send_methods:
2589                    indata = (data_prefix + meth_name).encode('ascii')
2590                    try:
2591                        send_meth(indata, *args)
2592                        outdata = s.read()
2593                        if outdata != indata.lower():
2594                            self.fail(
2595                                "While sending with <<{name:s}>> bad data "
2596                                "<<{outdata:r}>> ({nout:d}) received; "
2597                                "expected <<{indata:r}>> ({nin:d})\n".format(
2598                                    name=meth_name, outdata=outdata[:20],
2599                                    nout=len(outdata),
2600                                    indata=indata[:20], nin=len(indata)
2601                                )
2602                            )
2603                    except ValueError as e:
2604                        if expect_success:
2605                            self.fail(
2606                                "Failed to send with method <<{name:s}>>; "
2607                                "expected to succeed.\n".format(name=meth_name)
2608                            )
2609                        if not str(e).startswith(meth_name):
2610                            self.fail(
2611                                "Method <<{name:s}>> failed with unexpected "
2612                                "exception message: {exp:s}\n".format(
2613                                    name=meth_name, exp=e
2614                                )
2615                            )
2616
2617                for meth_name, recv_meth, expect_success, args in recv_methods:
2618                    indata = (data_prefix + meth_name).encode('ascii')
2619                    try:
2620                        s.send(indata)
2621                        outdata = recv_meth(*args)
2622                        if outdata != indata.lower():
2623                            self.fail(
2624                                "While receiving with <<{name:s}>> bad data "
2625                                "<<{outdata:r}>> ({nout:d}) received; "
2626                                "expected <<{indata:r}>> ({nin:d})\n".format(
2627                                    name=meth_name, outdata=outdata[:20],
2628                                    nout=len(outdata),
2629                                    indata=indata[:20], nin=len(indata)
2630                                )
2631                            )
2632                    except ValueError as e:
2633                        if expect_success:
2634                            self.fail(
2635                                "Failed to receive with method <<{name:s}>>; "
2636                                "expected to succeed.\n".format(name=meth_name)
2637                            )
2638                        if not str(e).startswith(meth_name):
2639                            self.fail(
2640                                "Method <<{name:s}>> failed with unexpected "
2641                                "exception message: {exp:s}\n".format(
2642                                    name=meth_name, exp=e
2643                                )
2644                            )
2645                        # consume data
2646                        s.read()
2647
2648                # read(-1, buffer) is supported, even though read(-1) is not
2649                data = b"data"
2650                s.send(data)
2651                buffer = bytearray(len(data))
2652                self.assertEqual(s.read(-1, buffer), len(data))
2653                self.assertEqual(buffer, data)
2654
2655                s.write(b"over\n")
2656
2657                self.assertRaises(ValueError, s.recv, -1)
2658                self.assertRaises(ValueError, s.read, -1)
2659
2660                s.close()
2661
2662        def test_recv_zero(self):
2663            server = ThreadedEchoServer(CERTFILE)
2664            server.__enter__()
2665            self.addCleanup(server.__exit__, None, None)
2666            s = socket.create_connection((HOST, server.port))
2667            self.addCleanup(s.close)
2668            s = ssl.wrap_socket(s, suppress_ragged_eofs=False)
2669            self.addCleanup(s.close)
2670
2671            # recv/read(0) should return no data
2672            s.send(b"data")
2673            self.assertEqual(s.recv(0), b"")
2674            self.assertEqual(s.read(0), b"")
2675            self.assertEqual(s.read(), b"data")
2676
2677            # Should not block if the other end sends no data
2678            s.setblocking(False)
2679            self.assertEqual(s.recv(0), b"")
2680            self.assertEqual(s.recv_into(bytearray()), 0)
2681
2682        def test_handshake_timeout(self):
2683            # Issue #5103: SSL handshake must respect the socket timeout
2684            server = socket.socket(socket.AF_INET)
2685            host = "127.0.0.1"
2686            port = support.bind_port(server)
2687            started = threading.Event()
2688            finish = False
2689
2690            def serve():
2691                server.listen(5)
2692                started.set()
2693                conns = []
2694                while not finish:
2695                    r, w, e = select.select([server], [], [], 0.1)
2696                    if server in r:
2697                        # Let the socket hang around rather than having
2698                        # it closed by garbage collection.
2699                        conns.append(server.accept()[0])
2700                for sock in conns:
2701                    sock.close()
2702
2703            t = threading.Thread(target=serve)
2704            t.start()
2705            started.wait()
2706
2707            try:
2708                try:
2709                    c = socket.socket(socket.AF_INET)
2710                    c.settimeout(0.2)
2711                    c.connect((host, port))
2712                    # Will attempt handshake and time out
2713                    self.assertRaisesRegexp(ssl.SSLError, "timed out",
2714                                            ssl.wrap_socket, c)
2715                finally:
2716                    c.close()
2717                try:
2718                    c = socket.socket(socket.AF_INET)
2719                    c = ssl.wrap_socket(c)
2720                    c.settimeout(0.2)
2721                    # Will attempt handshake and time out
2722                    self.assertRaisesRegexp(ssl.SSLError, "timed out",
2723                                            c.connect, (host, port))
2724                finally:
2725                    c.close()
2726            finally:
2727                finish = True
2728                t.join()
2729                server.close()
2730
2731        def test_server_accept(self):
2732            # Issue #16357: accept() on a SSLSocket created through
2733            # SSLContext.wrap_socket().
2734            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2735            context.verify_mode = ssl.CERT_REQUIRED
2736            context.load_verify_locations(CERTFILE)
2737            context.load_cert_chain(CERTFILE)
2738            server = socket.socket(socket.AF_INET)
2739            host = "127.0.0.1"
2740            port = support.bind_port(server)
2741            server = context.wrap_socket(server, server_side=True)
2742
2743            evt = threading.Event()
2744            remote = [None]
2745            peer = [None]
2746            def serve():
2747                server.listen(5)
2748                # Block on the accept and wait on the connection to close.
2749                evt.set()
2750                remote[0], peer[0] = server.accept()
2751                remote[0].recv(1)
2752
2753            t = threading.Thread(target=serve)
2754            t.start()
2755            # Client wait until server setup and perform a connect.
2756            evt.wait()
2757            client = context.wrap_socket(socket.socket())
2758            client.connect((host, port))
2759            client_addr = client.getsockname()
2760            client.close()
2761            t.join()
2762            remote[0].close()
2763            server.close()
2764            # Sanity checks.
2765            self.assertIsInstance(remote[0], ssl.SSLSocket)
2766            self.assertEqual(peer[0], client_addr)
2767
2768        def test_getpeercert_enotconn(self):
2769            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2770            with closing(context.wrap_socket(socket.socket())) as sock:
2771                with self.assertRaises(socket.error) as cm:
2772                    sock.getpeercert()
2773                self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2774
2775        def test_do_handshake_enotconn(self):
2776            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2777            with closing(context.wrap_socket(socket.socket())) as sock:
2778                with self.assertRaises(socket.error) as cm:
2779                    sock.do_handshake()
2780                self.assertEqual(cm.exception.errno, errno.ENOTCONN)
2781
2782        def test_default_ciphers(self):
2783            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2784            try:
2785                # Force a set of weak ciphers on our client context
2786                context.set_ciphers("DES")
2787            except ssl.SSLError:
2788                self.skipTest("no DES cipher available")
2789            with ThreadedEchoServer(CERTFILE,
2790                                    ssl_version=ssl.PROTOCOL_SSLv23,
2791                                    chatty=False) as server:
2792                with closing(context.wrap_socket(socket.socket())) as s:
2793                    with self.assertRaises(ssl.SSLError):
2794                        s.connect((HOST, server.port))
2795            self.assertIn("no shared cipher", str(server.conn_errors[0]))
2796
2797        def test_version_basic(self):
2798            """
2799            Basic tests for SSLSocket.version().
2800            More tests are done in the test_protocol_*() methods.
2801            """
2802            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2803            with ThreadedEchoServer(CERTFILE,
2804                                    ssl_version=ssl.PROTOCOL_TLSv1,
2805                                    chatty=False) as server:
2806                with closing(context.wrap_socket(socket.socket())) as s:
2807                    self.assertIs(s.version(), None)
2808                    s.connect((HOST, server.port))
2809                    self.assertEqual(s.version(), 'TLSv1')
2810                self.assertIs(s.version(), None)
2811
2812        @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
2813        def test_default_ecdh_curve(self):
2814            # Issue #21015: elliptic curve-based Diffie Hellman key exchange
2815            # should be enabled by default on SSL contexts.
2816            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
2817            context.load_cert_chain(CERTFILE)
2818            # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
2819            # explicitly using the 'ECCdraft' cipher alias.  Otherwise,
2820            # our default cipher list should prefer ECDH-based ciphers
2821            # automatically.
2822            if ssl.OPENSSL_VERSION_INFO < (1, 0, 0):
2823                context.set_ciphers("ECCdraft:ECDH")
2824            with ThreadedEchoServer(context=context) as server:
2825                with closing(context.wrap_socket(socket.socket())) as s:
2826                    s.connect((HOST, server.port))
2827                    self.assertIn("ECDH", s.cipher()[0])
2828
2829        @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES,
2830                             "'tls-unique' channel binding not available")
2831        def test_tls_unique_channel_binding(self):
2832            """Test tls-unique channel binding."""
2833            if support.verbose:
2834                sys.stdout.write("\n")
2835
2836            server = ThreadedEchoServer(CERTFILE,
2837                                        certreqs=ssl.CERT_NONE,
2838                                        ssl_version=ssl.PROTOCOL_TLSv1,
2839                                        cacerts=CERTFILE,
2840                                        chatty=True,
2841                                        connectionchatty=False)
2842            with server:
2843                s = ssl.wrap_socket(socket.socket(),
2844                                    server_side=False,
2845                                    certfile=CERTFILE,
2846                                    ca_certs=CERTFILE,
2847                                    cert_reqs=ssl.CERT_NONE,
2848                                    ssl_version=ssl.PROTOCOL_TLSv1)
2849                s.connect((HOST, server.port))
2850                # get the data
2851                cb_data = s.get_channel_binding("tls-unique")
2852                if support.verbose:
2853                    sys.stdout.write(" got channel binding data: {0!r}\n"
2854                                     .format(cb_data))
2855
2856                # check if it is sane
2857                self.assertIsNotNone(cb_data)
2858                self.assertEqual(len(cb_data), 12) # True for TLSv1
2859
2860                # and compare with the peers version
2861                s.write(b"CB tls-unique\n")
2862                peer_data_repr = s.read().strip()
2863                self.assertEqual(peer_data_repr,
2864                                 repr(cb_data).encode("us-ascii"))
2865                s.close()
2866
2867                # now, again
2868                s = ssl.wrap_socket(socket.socket(),
2869                                    server_side=False,
2870                                    certfile=CERTFILE,
2871                                    ca_certs=CERTFILE,
2872                                    cert_reqs=ssl.CERT_NONE,
2873                                    ssl_version=ssl.PROTOCOL_TLSv1)
2874                s.connect((HOST, server.port))
2875                new_cb_data = s.get_channel_binding("tls-unique")
2876                if support.verbose:
2877                    sys.stdout.write(" got another channel binding data: {0!r}\n"
2878                                     .format(new_cb_data))
2879                # is it really unique
2880                self.assertNotEqual(cb_data, new_cb_data)
2881                self.assertIsNotNone(cb_data)
2882                self.assertEqual(len(cb_data), 12) # True for TLSv1
2883                s.write(b"CB tls-unique\n")
2884                peer_data_repr = s.read().strip()
2885                self.assertEqual(peer_data_repr,
2886                                 repr(new_cb_data).encode("us-ascii"))
2887                s.close()
2888
2889        def test_compression(self):
2890            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2891            context.load_cert_chain(CERTFILE)
2892            stats = server_params_test(context, context,
2893                                       chatty=True, connectionchatty=True)
2894            if support.verbose:
2895                sys.stdout.write(" got compression: {!r}\n".format(stats['compression']))
2896            self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' })
2897
2898        @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'),
2899                             "ssl.OP_NO_COMPRESSION needed for this test")
2900        def test_compression_disabled(self):
2901            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2902            context.load_cert_chain(CERTFILE)
2903            context.options |= ssl.OP_NO_COMPRESSION
2904            stats = server_params_test(context, context,
2905                                       chatty=True, connectionchatty=True)
2906            self.assertIs(stats['compression'], None)
2907
2908        def test_dh_params(self):
2909            # Check we can get a connection with ephemeral Diffie-Hellman
2910            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2911            context.load_cert_chain(CERTFILE)
2912            context.load_dh_params(DHFILE)
2913            context.set_ciphers("kEDH")
2914            stats = server_params_test(context, context,
2915                                       chatty=True, connectionchatty=True)
2916            cipher = stats["cipher"][0]
2917            parts = cipher.split("-")
2918            if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts:
2919                self.fail("Non-DH cipher: " + cipher[0])
2920
2921        def test_selected_alpn_protocol(self):
2922            # selected_alpn_protocol() is None unless ALPN is used.
2923            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2924            context.load_cert_chain(CERTFILE)
2925            stats = server_params_test(context, context,
2926                                       chatty=True, connectionchatty=True)
2927            self.assertIs(stats['client_alpn_protocol'], None)
2928
2929        @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required")
2930        def test_selected_alpn_protocol_if_server_uses_alpn(self):
2931            # selected_alpn_protocol() is None unless ALPN is used by the client.
2932            client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2933            client_context.load_verify_locations(CERTFILE)
2934            server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2935            server_context.load_cert_chain(CERTFILE)
2936            server_context.set_alpn_protocols(['foo', 'bar'])
2937            stats = server_params_test(client_context, server_context,
2938                                       chatty=True, connectionchatty=True)
2939            self.assertIs(stats['client_alpn_protocol'], None)
2940
2941        @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
2942        def test_alpn_protocols(self):
2943            server_protocols = ['foo', 'bar', 'milkshake']
2944            protocol_tests = [
2945                (['foo', 'bar'], 'foo'),
2946                (['bar', 'foo'], 'foo'),
2947                (['milkshake'], 'milkshake'),
2948                (['http/3.0', 'http/4.0'], None)
2949            ]
2950            for client_protocols, expected in protocol_tests:
2951                server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
2952                server_context.load_cert_chain(CERTFILE)
2953                server_context.set_alpn_protocols(server_protocols)
2954                client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
2955                client_context.load_cert_chain(CERTFILE)
2956                client_context.set_alpn_protocols(client_protocols)
2957
2958                try:
2959                    stats = server_params_test(client_context,
2960                                               server_context,
2961                                               chatty=True,
2962                                               connectionchatty=True)
2963                except ssl.SSLError as e:
2964                    stats = e
2965
2966                if expected is None and IS_OPENSSL_1_1:
2967                    # OpenSSL 1.1.0 raises handshake error
2968                    self.assertIsInstance(stats, ssl.SSLError)
2969                else:
2970                    msg = "failed trying %s (s) and %s (c).\n" \
2971                        "was expecting %s, but got %%s from the %%s" \
2972                            % (str(server_protocols), str(client_protocols),
2973                                str(expected))
2974                    client_result = stats['client_alpn_protocol']
2975                    self.assertEqual(client_result, expected,
2976                                     msg % (client_result, "client"))
2977                    server_result = stats['server_alpn_protocols'][-1] \
2978                        if len(stats['server_alpn_protocols']) else 'nothing'
2979                    self.assertEqual(server_result, expected,
2980                                     msg % (server_result, "server"))
2981
2982        def test_selected_npn_protocol(self):
2983            # selected_npn_protocol() is None unless NPN is used
2984            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
2985            context.load_cert_chain(CERTFILE)
2986            stats = server_params_test(context, context,
2987                                       chatty=True, connectionchatty=True)
2988            self.assertIs(stats['client_npn_protocol'], None)
2989
2990        @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test")
2991        def test_npn_protocols(self):
2992            server_protocols = ['http/1.1', 'spdy/2']
2993            protocol_tests = [
2994                (['http/1.1', 'spdy/2'], 'http/1.1'),
2995                (['spdy/2', 'http/1.1'], 'http/1.1'),
2996                (['spdy/2', 'test'], 'spdy/2'),
2997                (['abc', 'def'], 'abc')
2998            ]
2999            for client_protocols, expected in protocol_tests:
3000                server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3001                server_context.load_cert_chain(CERTFILE)
3002                server_context.set_npn_protocols(server_protocols)
3003                client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3004                client_context.load_cert_chain(CERTFILE)
3005                client_context.set_npn_protocols(client_protocols)
3006                stats = server_params_test(client_context, server_context,
3007                                           chatty=True, connectionchatty=True)
3008
3009                msg = "failed trying %s (s) and %s (c).\n" \
3010                      "was expecting %s, but got %%s from the %%s" \
3011                          % (str(server_protocols), str(client_protocols),
3012                             str(expected))
3013                client_result = stats['client_npn_protocol']
3014                self.assertEqual(client_result, expected, msg % (client_result, "client"))
3015                server_result = stats['server_npn_protocols'][-1] \
3016                    if len(stats['server_npn_protocols']) else 'nothing'
3017                self.assertEqual(server_result, expected, msg % (server_result, "server"))
3018
3019        def sni_contexts(self):
3020            server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3021            server_context.load_cert_chain(SIGNED_CERTFILE)
3022            other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3023            other_context.load_cert_chain(SIGNED_CERTFILE2)
3024            client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
3025            client_context.verify_mode = ssl.CERT_REQUIRED
3026            client_context.load_verify_locations(SIGNING_CA)
3027            return server_context, other_context, client_context
3028
3029        def check_common_name(self, stats, name):
3030            cert = stats['peercert']
3031            self.assertIn((('commonName', name),), cert['subject'])
3032
3033        @needs_sni
3034        def test_sni_callback(self):
3035            calls = []
3036            server_context, other_context, client_context = self.sni_contexts()
3037
3038            def servername_cb(ssl_sock, server_name, initial_context):
3039                calls.append((server_name, initial_context))
3040                if server_name is not None:
3041                    ssl_sock.context = other_context
3042            server_context.set_servername_callback(servername_cb)
3043
3044            stats = server_params_test(client_context, server_context,
3045                                       chatty=True,
3046                                       sni_name='supermessage')
3047            # The hostname was fetched properly, and the certificate was
3048            # changed for the connection.
3049            self.assertEqual(calls, [("supermessage", server_context)])
3050            # CERTFILE4 was selected
3051            self.check_common_name(stats, 'fakehostname')
3052
3053            calls = []
3054            # The callback is called with server_name=None
3055            stats = server_params_test(client_context, server_context,
3056                                       chatty=True,
3057                                       sni_name=None)
3058            self.assertEqual(calls, [(None, server_context)])
3059            self.check_common_name(stats, 'localhost')
3060
3061            # Check disabling the callback
3062            calls = []
3063            server_context.set_servername_callback(None)
3064
3065            stats = server_params_test(client_context, server_context,
3066                                       chatty=True,
3067                                       sni_name='notfunny')
3068            # Certificate didn't change
3069            self.check_common_name(stats, 'localhost')
3070            self.assertEqual(calls, [])
3071
3072        @needs_sni
3073        def test_sni_callback_alert(self):
3074            # Returning a TLS alert is reflected to the connecting client
3075            server_context, other_context, client_context = self.sni_contexts()
3076
3077            def cb_returning_alert(ssl_sock, server_name, initial_context):
3078                return ssl.ALERT_DESCRIPTION_ACCESS_DENIED
3079            server_context.set_servername_callback(cb_returning_alert)
3080
3081            with self.assertRaises(ssl.SSLError) as cm:
3082                stats = server_params_test(client_context, server_context,
3083                                           chatty=False,
3084                                           sni_name='supermessage')
3085            self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED')
3086
3087        @needs_sni
3088        def test_sni_callback_raising(self):
3089            # Raising fails the connection with a TLS handshake failure alert.
3090            server_context, other_context, client_context = self.sni_contexts()
3091
3092            def cb_raising(ssl_sock, server_name, initial_context):
3093                1.0/0.0
3094            server_context.set_servername_callback(cb_raising)
3095
3096            with self.assertRaises(ssl.SSLError) as cm, \
3097                 support.captured_stderr() as stderr:
3098                stats = server_params_test(client_context, server_context,
3099                                           chatty=False,
3100                                           sni_name='supermessage')
3101            self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE')
3102            self.assertIn("ZeroDivisionError", stderr.getvalue())
3103
3104        @needs_sni
3105        def test_sni_callback_wrong_return_type(self):
3106            # Returning the wrong return type terminates the TLS connection
3107            # with an internal error alert.
3108            server_context, other_context, client_context = self.sni_contexts()
3109
3110            def cb_wrong_return_type(ssl_sock, server_name, initial_context):
3111                return "foo"
3112            server_context.set_servername_callback(cb_wrong_return_type)
3113
3114            with self.assertRaises(ssl.SSLError) as cm, \
3115                 support.captured_stderr() as stderr:
3116                stats = server_params_test(client_context, server_context,
3117                                           chatty=False,
3118                                           sni_name='supermessage')
3119            self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR')
3120            self.assertIn("TypeError", stderr.getvalue())
3121
3122        def test_read_write_after_close_raises_valuerror(self):
3123            context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
3124            context.verify_mode = ssl.CERT_REQUIRED
3125            context.load_verify_locations(CERTFILE)
3126            context.load_cert_chain(CERTFILE)
3127            server = ThreadedEchoServer(context=context, chatty=False)
3128
3129            with server:
3130                s = context.wrap_socket(socket.socket())
3131                s.connect((HOST, server.port))
3132                s.close()
3133
3134                self.assertRaises(ValueError, s.read, 1024)
3135                self.assertRaises(ValueError, s.write, b'hello')
3136
3137
3138def test_main(verbose=False):
3139    if support.verbose:
3140        plats = {
3141            'Linux': platform.linux_distribution,
3142            'Mac': platform.mac_ver,
3143            'Windows': platform.win32_ver,
3144        }
3145        for name, func in plats.items():
3146            plat = func()
3147            if plat and plat[0]:
3148                plat = '%s %r' % (name, plat)
3149                break
3150        else:
3151            plat = repr(platform.platform())
3152        print("test_ssl: testing with %r %r" %
3153            (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO))
3154        print("          under %s" % plat)
3155        print("          HAS_SNI = %r" % ssl.HAS_SNI)
3156        print("          OP_ALL = 0x%8x" % ssl.OP_ALL)
3157        try:
3158            print("          OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1)
3159        except AttributeError:
3160            pass
3161
3162    for filename in [
3163        CERTFILE, REMOTE_ROOT_CERT, BYTES_CERTFILE,
3164        ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY,
3165        SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA,
3166        BADCERT, BADKEY, EMPTYCERT]:
3167        if not os.path.exists(filename):
3168            raise support.TestFailed("Can't read certificate file %r" % filename)
3169
3170    tests = [ContextTests, BasicTests, BasicSocketTests, SSLErrorTests]
3171
3172    if support.is_resource_enabled('network'):
3173        tests.append(NetworkedTests)
3174
3175    if _have_threads:
3176        thread_info = support.threading_setup()
3177        if thread_info:
3178            tests.append(ThreadedTests)
3179
3180    try:
3181        support.run_unittest(*tests)
3182    finally:
3183        if _have_threads:
3184            support.threading_cleanup(*thread_info)
3185
3186if __name__ == "__main__":
3187    test_main()
3188