1# -*- coding: utf-8 -*- 2# Test the support for SSL and sockets 3 4import sys 5import unittest 6from test import test_support as support 7from test.script_helper import assert_python_ok 8import asyncore 9import socket 10import select 11import time 12import datetime 13import gc 14import os 15import errno 16import pprint 17import tempfile 18import urllib2 19import traceback 20import weakref 21import platform 22import functools 23from contextlib import closing 24 25ssl = support.import_module("ssl") 26 27PROTOCOLS = sorted(ssl._PROTOCOL_NAMES) 28HOST = support.HOST 29IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL') 30IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0) 31 32 33def data_file(*name): 34 return os.path.join(os.path.dirname(__file__), *name) 35 36# The custom key and certificate files used in test_ssl are generated 37# using Lib/test/make_ssl_certs.py. 38# Other certificates are simply fetched from the Internet servers they 39# are meant to authenticate. 40 41CERTFILE = data_file("keycert.pem") 42BYTES_CERTFILE = CERTFILE.encode(sys.getfilesystemencoding()) 43ONLYCERT = data_file("ssl_cert.pem") 44ONLYKEY = data_file("ssl_key.pem") 45BYTES_ONLYCERT = ONLYCERT.encode(sys.getfilesystemencoding()) 46BYTES_ONLYKEY = ONLYKEY.encode(sys.getfilesystemencoding()) 47CERTFILE_PROTECTED = data_file("keycert.passwd.pem") 48ONLYKEY_PROTECTED = data_file("ssl_key.passwd.pem") 49KEY_PASSWORD = "somepass" 50CAPATH = data_file("capath") 51BYTES_CAPATH = CAPATH.encode(sys.getfilesystemencoding()) 52CAFILE_NEURONIO = data_file("capath", "4e1295a3.0") 53CAFILE_CACERT = data_file("capath", "5ed36f99.0") 54 55 56# empty CRL 57CRLFILE = data_file("revocation.crl") 58 59# Two keys and certs signed by the same CA (for SNI tests) 60SIGNED_CERTFILE = data_file("keycert3.pem") 61SIGNED_CERTFILE2 = data_file("keycert4.pem") 62SIGNING_CA = data_file("pycacert.pem") 63# cert with all kinds of subject alt names 64ALLSANFILE = data_file("allsans.pem") 65 66REMOTE_HOST = "self-signed.pythontest.net" 67REMOTE_ROOT_CERT = data_file("selfsigned_pythontestdotnet.pem") 68 69EMPTYCERT = data_file("nullcert.pem") 70BADCERT = data_file("badcert.pem") 71NONEXISTINGCERT = data_file("XXXnonexisting.pem") 72BADKEY = data_file("badkey.pem") 73NOKIACERT = data_file("nokia.pem") 74NULLBYTECERT = data_file("nullbytecert.pem") 75 76DHFILE = data_file("dh1024.pem") 77BYTES_DHFILE = DHFILE.encode(sys.getfilesystemencoding()) 78 79 80def handle_error(prefix): 81 exc_format = ' '.join(traceback.format_exception(*sys.exc_info())) 82 if support.verbose: 83 sys.stdout.write(prefix + exc_format) 84 85 86class BasicTests(unittest.TestCase): 87 88 def test_sslwrap_simple(self): 89 # A crude test for the legacy API 90 try: 91 ssl.sslwrap_simple(socket.socket(socket.AF_INET)) 92 except IOError, e: 93 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that 94 pass 95 else: 96 raise 97 try: 98 ssl.sslwrap_simple(socket.socket(socket.AF_INET)._sock) 99 except IOError, e: 100 if e.errno == 32: # broken pipe when ssl_sock.do_handshake(), this test doesn't care about that 101 pass 102 else: 103 raise 104 105 106def can_clear_options(): 107 # 0.9.8m or higher 108 return ssl._OPENSSL_API_VERSION >= (0, 9, 8, 13, 15) 109 110def no_sslv2_implies_sslv3_hello(): 111 # 0.9.7h or higher 112 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 7, 8, 15) 113 114def have_verify_flags(): 115 # 0.9.8 or higher 116 return ssl.OPENSSL_VERSION_INFO >= (0, 9, 8, 0, 15) 117 118def utc_offset(): #NOTE: ignore issues like #1647654 119 # local time = utc time + utc offset 120 if time.daylight and time.localtime().tm_isdst > 0: 121 return -time.altzone # seconds 122 return -time.timezone 123 124def asn1time(cert_time): 125 # Some versions of OpenSSL ignore seconds, see #18207 126 # 0.9.8.i 127 if ssl._OPENSSL_API_VERSION == (0, 9, 8, 9, 15): 128 fmt = "%b %d %H:%M:%S %Y GMT" 129 dt = datetime.datetime.strptime(cert_time, fmt) 130 dt = dt.replace(second=0) 131 cert_time = dt.strftime(fmt) 132 # %d adds leading zero but ASN1_TIME_print() uses leading space 133 if cert_time[4] == "0": 134 cert_time = cert_time[:4] + " " + cert_time[5:] 135 136 return cert_time 137 138# Issue #9415: Ubuntu hijacks their OpenSSL and forcefully disables SSLv2 139def skip_if_broken_ubuntu_ssl(func): 140 if hasattr(ssl, 'PROTOCOL_SSLv2'): 141 @functools.wraps(func) 142 def f(*args, **kwargs): 143 try: 144 ssl.SSLContext(ssl.PROTOCOL_SSLv2) 145 except ssl.SSLError: 146 if (ssl.OPENSSL_VERSION_INFO == (0, 9, 8, 15, 15) and 147 platform.linux_distribution() == ('debian', 'squeeze/sid', '')): 148 raise unittest.SkipTest("Patched Ubuntu OpenSSL breaks behaviour") 149 return func(*args, **kwargs) 150 return f 151 else: 152 return func 153 154needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test") 155 156 157class BasicSocketTests(unittest.TestCase): 158 159 def test_constants(self): 160 ssl.CERT_NONE 161 ssl.CERT_OPTIONAL 162 ssl.CERT_REQUIRED 163 ssl.OP_CIPHER_SERVER_PREFERENCE 164 ssl.OP_SINGLE_DH_USE 165 if ssl.HAS_ECDH: 166 ssl.OP_SINGLE_ECDH_USE 167 if ssl.OPENSSL_VERSION_INFO >= (1, 0): 168 ssl.OP_NO_COMPRESSION 169 self.assertIn(ssl.HAS_SNI, {True, False}) 170 self.assertIn(ssl.HAS_ECDH, {True, False}) 171 172 def test_random(self): 173 v = ssl.RAND_status() 174 if support.verbose: 175 sys.stdout.write("\n RAND_status is %d (%s)\n" 176 % (v, (v and "sufficient randomness") or 177 "insufficient randomness")) 178 if hasattr(ssl, 'RAND_egd'): 179 self.assertRaises(TypeError, ssl.RAND_egd, 1) 180 self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1) 181 ssl.RAND_add("this is a random string", 75.0) 182 183 def test_parse_cert(self): 184 # note that this uses an 'unofficial' function in _ssl.c, 185 # provided solely for this test, to exercise the certificate 186 # parsing code 187 p = ssl._ssl._test_decode_cert(CERTFILE) 188 if support.verbose: 189 sys.stdout.write("\n" + pprint.pformat(p) + "\n") 190 self.assertEqual(p['issuer'], 191 ((('countryName', 'XY'),), 192 (('localityName', 'Castle Anthrax'),), 193 (('organizationName', 'Python Software Foundation'),), 194 (('commonName', 'localhost'),)) 195 ) 196 # Note the next three asserts will fail if the keys are regenerated 197 self.assertEqual(p['notAfter'], asn1time('Oct 5 23:01:56 2020 GMT')) 198 self.assertEqual(p['notBefore'], asn1time('Oct 8 23:01:56 2010 GMT')) 199 self.assertEqual(p['serialNumber'], 'D7C7381919AFC24E') 200 self.assertEqual(p['subject'], 201 ((('countryName', 'XY'),), 202 (('localityName', 'Castle Anthrax'),), 203 (('organizationName', 'Python Software Foundation'),), 204 (('commonName', 'localhost'),)) 205 ) 206 self.assertEqual(p['subjectAltName'], (('DNS', 'localhost'),)) 207 # Issue #13034: the subjectAltName in some certificates 208 # (notably projects.developer.nokia.com:443) wasn't parsed 209 p = ssl._ssl._test_decode_cert(NOKIACERT) 210 if support.verbose: 211 sys.stdout.write("\n" + pprint.pformat(p) + "\n") 212 self.assertEqual(p['subjectAltName'], 213 (('DNS', 'projects.developer.nokia.com'), 214 ('DNS', 'projects.forum.nokia.com')) 215 ) 216 # extra OCSP and AIA fields 217 self.assertEqual(p['OCSP'], ('http://ocsp.verisign.com',)) 218 self.assertEqual(p['caIssuers'], 219 ('http://SVRIntl-G3-aia.verisign.com/SVRIntlG3.cer',)) 220 self.assertEqual(p['crlDistributionPoints'], 221 ('http://SVRIntl-G3-crl.verisign.com/SVRIntlG3.crl',)) 222 223 def test_parse_cert_CVE_2013_4238(self): 224 p = ssl._ssl._test_decode_cert(NULLBYTECERT) 225 if support.verbose: 226 sys.stdout.write("\n" + pprint.pformat(p) + "\n") 227 subject = ((('countryName', 'US'),), 228 (('stateOrProvinceName', 'Oregon'),), 229 (('localityName', 'Beaverton'),), 230 (('organizationName', 'Python Software Foundation'),), 231 (('organizationalUnitName', 'Python Core Development'),), 232 (('commonName', 'null.python.org\x00example.org'),), 233 (('emailAddress', 'python-dev@python.org'),)) 234 self.assertEqual(p['subject'], subject) 235 self.assertEqual(p['issuer'], subject) 236 if ssl._OPENSSL_API_VERSION >= (0, 9, 8): 237 san = (('DNS', 'altnull.python.org\x00example.com'), 238 ('email', 'null@python.org\x00user@example.org'), 239 ('URI', 'http://null.python.org\x00http://example.org'), 240 ('IP Address', '192.0.2.1'), 241 ('IP Address', '2001:DB8:0:0:0:0:0:1\n')) 242 else: 243 # OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName 244 san = (('DNS', 'altnull.python.org\x00example.com'), 245 ('email', 'null@python.org\x00user@example.org'), 246 ('URI', 'http://null.python.org\x00http://example.org'), 247 ('IP Address', '192.0.2.1'), 248 ('IP Address', '<invalid>')) 249 250 self.assertEqual(p['subjectAltName'], san) 251 252 def test_parse_all_sans(self): 253 p = ssl._ssl._test_decode_cert(ALLSANFILE) 254 self.assertEqual(p['subjectAltName'], 255 ( 256 ('DNS', 'allsans'), 257 ('othername', '<unsupported>'), 258 ('othername', '<unsupported>'), 259 ('email', 'user@example.org'), 260 ('DNS', 'www.example.org'), 261 ('DirName', 262 ((('countryName', 'XY'),), 263 (('localityName', 'Castle Anthrax'),), 264 (('organizationName', 'Python Software Foundation'),), 265 (('commonName', 'dirname example'),))), 266 ('URI', 'https://www.python.org/'), 267 ('IP Address', '127.0.0.1'), 268 ('IP Address', '0:0:0:0:0:0:0:1\n'), 269 ('Registered ID', '1.2.3.4.5') 270 ) 271 ) 272 273 def test_DER_to_PEM(self): 274 with open(CAFILE_CACERT, 'r') as f: 275 pem = f.read() 276 d1 = ssl.PEM_cert_to_DER_cert(pem) 277 p2 = ssl.DER_cert_to_PEM_cert(d1) 278 d2 = ssl.PEM_cert_to_DER_cert(p2) 279 self.assertEqual(d1, d2) 280 if not p2.startswith(ssl.PEM_HEADER + '\n'): 281 self.fail("DER-to-PEM didn't include correct header:\n%r\n" % p2) 282 if not p2.endswith('\n' + ssl.PEM_FOOTER + '\n'): 283 self.fail("DER-to-PEM didn't include correct footer:\n%r\n" % p2) 284 285 def test_openssl_version(self): 286 n = ssl.OPENSSL_VERSION_NUMBER 287 t = ssl.OPENSSL_VERSION_INFO 288 s = ssl.OPENSSL_VERSION 289 self.assertIsInstance(n, (int, long)) 290 self.assertIsInstance(t, tuple) 291 self.assertIsInstance(s, str) 292 # Some sanity checks follow 293 # >= 0.9 294 self.assertGreaterEqual(n, 0x900000) 295 # < 3.0 296 self.assertLess(n, 0x30000000) 297 major, minor, fix, patch, status = t 298 self.assertGreaterEqual(major, 0) 299 self.assertLess(major, 3) 300 self.assertGreaterEqual(minor, 0) 301 self.assertLess(minor, 256) 302 self.assertGreaterEqual(fix, 0) 303 self.assertLess(fix, 256) 304 self.assertGreaterEqual(patch, 0) 305 self.assertLessEqual(patch, 63) 306 self.assertGreaterEqual(status, 0) 307 self.assertLessEqual(status, 15) 308 # Version string as returned by {Open,Libre}SSL, the format might change 309 if IS_LIBRESSL: 310 self.assertTrue(s.startswith("LibreSSL {:d}".format(major)), 311 (s, t, hex(n))) 312 else: 313 self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)), 314 (s, t)) 315 316 @support.cpython_only 317 def test_refcycle(self): 318 # Issue #7943: an SSL object doesn't create reference cycles with 319 # itself. 320 s = socket.socket(socket.AF_INET) 321 ss = ssl.wrap_socket(s) 322 wr = weakref.ref(ss) 323 del ss 324 self.assertEqual(wr(), None) 325 326 def test_wrapped_unconnected(self): 327 # Methods on an unconnected SSLSocket propagate the original 328 # socket.error raise by the underlying socket object. 329 s = socket.socket(socket.AF_INET) 330 with closing(ssl.wrap_socket(s)) as ss: 331 self.assertRaises(socket.error, ss.recv, 1) 332 self.assertRaises(socket.error, ss.recv_into, bytearray(b'x')) 333 self.assertRaises(socket.error, ss.recvfrom, 1) 334 self.assertRaises(socket.error, ss.recvfrom_into, bytearray(b'x'), 1) 335 self.assertRaises(socket.error, ss.send, b'x') 336 self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0)) 337 338 def test_timeout(self): 339 # Issue #8524: when creating an SSL socket, the timeout of the 340 # original socket should be retained. 341 for timeout in (None, 0.0, 5.0): 342 s = socket.socket(socket.AF_INET) 343 s.settimeout(timeout) 344 with closing(ssl.wrap_socket(s)) as ss: 345 self.assertEqual(timeout, ss.gettimeout()) 346 347 def test_errors(self): 348 sock = socket.socket() 349 self.assertRaisesRegexp(ValueError, 350 "certfile must be specified", 351 ssl.wrap_socket, sock, keyfile=CERTFILE) 352 self.assertRaisesRegexp(ValueError, 353 "certfile must be specified for server-side operations", 354 ssl.wrap_socket, sock, server_side=True) 355 self.assertRaisesRegexp(ValueError, 356 "certfile must be specified for server-side operations", 357 ssl.wrap_socket, sock, server_side=True, certfile="") 358 with closing(ssl.wrap_socket(sock, server_side=True, certfile=CERTFILE)) as s: 359 self.assertRaisesRegexp(ValueError, "can't connect in server-side mode", 360 s.connect, (HOST, 8080)) 361 with self.assertRaises(IOError) as cm: 362 with closing(socket.socket()) as sock: 363 ssl.wrap_socket(sock, certfile=NONEXISTINGCERT) 364 self.assertEqual(cm.exception.errno, errno.ENOENT) 365 with self.assertRaises(IOError) as cm: 366 with closing(socket.socket()) as sock: 367 ssl.wrap_socket(sock, 368 certfile=CERTFILE, keyfile=NONEXISTINGCERT) 369 self.assertEqual(cm.exception.errno, errno.ENOENT) 370 with self.assertRaises(IOError) as cm: 371 with closing(socket.socket()) as sock: 372 ssl.wrap_socket(sock, 373 certfile=NONEXISTINGCERT, keyfile=NONEXISTINGCERT) 374 self.assertEqual(cm.exception.errno, errno.ENOENT) 375 376 def bad_cert_test(self, certfile): 377 """Check that trying to use the given client certificate fails""" 378 certfile = os.path.join(os.path.dirname(__file__) or os.curdir, 379 certfile) 380 sock = socket.socket() 381 self.addCleanup(sock.close) 382 with self.assertRaises(ssl.SSLError): 383 ssl.wrap_socket(sock, 384 certfile=certfile, 385 ssl_version=ssl.PROTOCOL_TLSv1) 386 387 def test_empty_cert(self): 388 """Wrapping with an empty cert file""" 389 self.bad_cert_test("nullcert.pem") 390 391 def test_malformed_cert(self): 392 """Wrapping with a badly formatted certificate (syntax error)""" 393 self.bad_cert_test("badcert.pem") 394 395 def test_malformed_key(self): 396 """Wrapping with a badly formatted key (syntax error)""" 397 self.bad_cert_test("badkey.pem") 398 399 def test_match_hostname(self): 400 def ok(cert, hostname): 401 ssl.match_hostname(cert, hostname) 402 def fail(cert, hostname): 403 self.assertRaises(ssl.CertificateError, 404 ssl.match_hostname, cert, hostname) 405 406 cert = {'subject': ((('commonName', 'example.com'),),)} 407 ok(cert, 'example.com') 408 ok(cert, 'ExAmple.cOm') 409 fail(cert, 'www.example.com') 410 fail(cert, '.example.com') 411 fail(cert, 'example.org') 412 fail(cert, 'exampleXcom') 413 414 cert = {'subject': ((('commonName', '*.a.com'),),)} 415 ok(cert, 'foo.a.com') 416 fail(cert, 'bar.foo.a.com') 417 fail(cert, 'a.com') 418 fail(cert, 'Xa.com') 419 fail(cert, '.a.com') 420 421 # only match one left-most wildcard 422 cert = {'subject': ((('commonName', 'f*.com'),),)} 423 ok(cert, 'foo.com') 424 ok(cert, 'f.com') 425 fail(cert, 'bar.com') 426 fail(cert, 'foo.a.com') 427 fail(cert, 'bar.foo.com') 428 429 # NULL bytes are bad, CVE-2013-4073 430 cert = {'subject': ((('commonName', 431 'null.python.org\x00example.org'),),)} 432 ok(cert, 'null.python.org\x00example.org') # or raise an error? 433 fail(cert, 'example.org') 434 fail(cert, 'null.python.org') 435 436 # error cases with wildcards 437 cert = {'subject': ((('commonName', '*.*.a.com'),),)} 438 fail(cert, 'bar.foo.a.com') 439 fail(cert, 'a.com') 440 fail(cert, 'Xa.com') 441 fail(cert, '.a.com') 442 443 cert = {'subject': ((('commonName', 'a.*.com'),),)} 444 fail(cert, 'a.foo.com') 445 fail(cert, 'a..com') 446 fail(cert, 'a.com') 447 448 # wildcard doesn't match IDNA prefix 'xn--' 449 idna = u'püthon.python.org'.encode("idna").decode("ascii") 450 cert = {'subject': ((('commonName', idna),),)} 451 ok(cert, idna) 452 cert = {'subject': ((('commonName', 'x*.python.org'),),)} 453 fail(cert, idna) 454 cert = {'subject': ((('commonName', 'xn--p*.python.org'),),)} 455 fail(cert, idna) 456 457 # wildcard in first fragment and IDNA A-labels in sequent fragments 458 # are supported. 459 idna = u'www*.pythön.org'.encode("idna").decode("ascii") 460 cert = {'subject': ((('commonName', idna),),)} 461 ok(cert, u'www.pythön.org'.encode("idna").decode("ascii")) 462 ok(cert, u'www1.pythön.org'.encode("idna").decode("ascii")) 463 fail(cert, u'ftp.pythön.org'.encode("idna").decode("ascii")) 464 fail(cert, u'pythön.org'.encode("idna").decode("ascii")) 465 466 # Slightly fake real-world example 467 cert = {'notAfter': 'Jun 26 21:41:46 2011 GMT', 468 'subject': ((('commonName', 'linuxfrz.org'),),), 469 'subjectAltName': (('DNS', 'linuxfr.org'), 470 ('DNS', 'linuxfr.com'), 471 ('othername', '<unsupported>'))} 472 ok(cert, 'linuxfr.org') 473 ok(cert, 'linuxfr.com') 474 # Not a "DNS" entry 475 fail(cert, '<unsupported>') 476 # When there is a subjectAltName, commonName isn't used 477 fail(cert, 'linuxfrz.org') 478 479 # A pristine real-world example 480 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT', 481 'subject': ((('countryName', 'US'),), 482 (('stateOrProvinceName', 'California'),), 483 (('localityName', 'Mountain View'),), 484 (('organizationName', 'Google Inc'),), 485 (('commonName', 'mail.google.com'),))} 486 ok(cert, 'mail.google.com') 487 fail(cert, 'gmail.com') 488 # Only commonName is considered 489 fail(cert, 'California') 490 491 # Neither commonName nor subjectAltName 492 cert = {'notAfter': 'Dec 18 23:59:59 2011 GMT', 493 'subject': ((('countryName', 'US'),), 494 (('stateOrProvinceName', 'California'),), 495 (('localityName', 'Mountain View'),), 496 (('organizationName', 'Google Inc'),))} 497 fail(cert, 'mail.google.com') 498 499 # No DNS entry in subjectAltName but a commonName 500 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT', 501 'subject': ((('countryName', 'US'),), 502 (('stateOrProvinceName', 'California'),), 503 (('localityName', 'Mountain View'),), 504 (('commonName', 'mail.google.com'),)), 505 'subjectAltName': (('othername', 'blabla'), )} 506 ok(cert, 'mail.google.com') 507 508 # No DNS entry subjectAltName and no commonName 509 cert = {'notAfter': 'Dec 18 23:59:59 2099 GMT', 510 'subject': ((('countryName', 'US'),), 511 (('stateOrProvinceName', 'California'),), 512 (('localityName', 'Mountain View'),), 513 (('organizationName', 'Google Inc'),)), 514 'subjectAltName': (('othername', 'blabla'),)} 515 fail(cert, 'google.com') 516 517 # Empty cert / no cert 518 self.assertRaises(ValueError, ssl.match_hostname, None, 'example.com') 519 self.assertRaises(ValueError, ssl.match_hostname, {}, 'example.com') 520 521 # Issue #17980: avoid denials of service by refusing more than one 522 # wildcard per fragment. 523 cert = {'subject': ((('commonName', 'a*b.com'),),)} 524 ok(cert, 'axxb.com') 525 cert = {'subject': ((('commonName', 'a*b.co*'),),)} 526 fail(cert, 'axxb.com') 527 cert = {'subject': ((('commonName', 'a*b*.com'),),)} 528 with self.assertRaises(ssl.CertificateError) as cm: 529 ssl.match_hostname(cert, 'axxbxxc.com') 530 self.assertIn("too many wildcards", str(cm.exception)) 531 532 def test_server_side(self): 533 # server_hostname doesn't work for server sockets 534 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 535 with closing(socket.socket()) as sock: 536 self.assertRaises(ValueError, ctx.wrap_socket, sock, True, 537 server_hostname="some.hostname") 538 539 def test_unknown_channel_binding(self): 540 # should raise ValueError for unknown type 541 s = socket.socket(socket.AF_INET) 542 with closing(ssl.wrap_socket(s)) as ss: 543 with self.assertRaises(ValueError): 544 ss.get_channel_binding("unknown-type") 545 546 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES, 547 "'tls-unique' channel binding not available") 548 def test_tls_unique_channel_binding(self): 549 # unconnected should return None for known type 550 s = socket.socket(socket.AF_INET) 551 with closing(ssl.wrap_socket(s)) as ss: 552 self.assertIsNone(ss.get_channel_binding("tls-unique")) 553 # the same for server-side 554 s = socket.socket(socket.AF_INET) 555 with closing(ssl.wrap_socket(s, server_side=True, certfile=CERTFILE)) as ss: 556 self.assertIsNone(ss.get_channel_binding("tls-unique")) 557 558 def test_get_default_verify_paths(self): 559 paths = ssl.get_default_verify_paths() 560 self.assertEqual(len(paths), 6) 561 self.assertIsInstance(paths, ssl.DefaultVerifyPaths) 562 563 with support.EnvironmentVarGuard() as env: 564 env["SSL_CERT_DIR"] = CAPATH 565 env["SSL_CERT_FILE"] = CERTFILE 566 paths = ssl.get_default_verify_paths() 567 self.assertEqual(paths.cafile, CERTFILE) 568 self.assertEqual(paths.capath, CAPATH) 569 570 @unittest.skipUnless(sys.platform == "win32", "Windows specific") 571 def test_enum_certificates(self): 572 self.assertTrue(ssl.enum_certificates("CA")) 573 self.assertTrue(ssl.enum_certificates("ROOT")) 574 575 self.assertRaises(TypeError, ssl.enum_certificates) 576 self.assertRaises(WindowsError, ssl.enum_certificates, "") 577 578 trust_oids = set() 579 for storename in ("CA", "ROOT"): 580 store = ssl.enum_certificates(storename) 581 self.assertIsInstance(store, list) 582 for element in store: 583 self.assertIsInstance(element, tuple) 584 self.assertEqual(len(element), 3) 585 cert, enc, trust = element 586 self.assertIsInstance(cert, bytes) 587 self.assertIn(enc, {"x509_asn", "pkcs_7_asn"}) 588 self.assertIsInstance(trust, (set, bool)) 589 if isinstance(trust, set): 590 trust_oids.update(trust) 591 592 serverAuth = "1.3.6.1.5.5.7.3.1" 593 self.assertIn(serverAuth, trust_oids) 594 595 @unittest.skipUnless(sys.platform == "win32", "Windows specific") 596 def test_enum_crls(self): 597 self.assertTrue(ssl.enum_crls("CA")) 598 self.assertRaises(TypeError, ssl.enum_crls) 599 self.assertRaises(WindowsError, ssl.enum_crls, "") 600 601 crls = ssl.enum_crls("CA") 602 self.assertIsInstance(crls, list) 603 for element in crls: 604 self.assertIsInstance(element, tuple) 605 self.assertEqual(len(element), 2) 606 self.assertIsInstance(element[0], bytes) 607 self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"}) 608 609 610 def test_asn1object(self): 611 expected = (129, 'serverAuth', 'TLS Web Server Authentication', 612 '1.3.6.1.5.5.7.3.1') 613 614 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1') 615 self.assertEqual(val, expected) 616 self.assertEqual(val.nid, 129) 617 self.assertEqual(val.shortname, 'serverAuth') 618 self.assertEqual(val.longname, 'TLS Web Server Authentication') 619 self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1') 620 self.assertIsInstance(val, ssl._ASN1Object) 621 self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth') 622 623 val = ssl._ASN1Object.fromnid(129) 624 self.assertEqual(val, expected) 625 self.assertIsInstance(val, ssl._ASN1Object) 626 self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1) 627 with self.assertRaisesRegexp(ValueError, "unknown NID 100000"): 628 ssl._ASN1Object.fromnid(100000) 629 for i in range(1000): 630 try: 631 obj = ssl._ASN1Object.fromnid(i) 632 except ValueError: 633 pass 634 else: 635 self.assertIsInstance(obj.nid, int) 636 self.assertIsInstance(obj.shortname, str) 637 self.assertIsInstance(obj.longname, str) 638 self.assertIsInstance(obj.oid, (str, type(None))) 639 640 val = ssl._ASN1Object.fromname('TLS Web Server Authentication') 641 self.assertEqual(val, expected) 642 self.assertIsInstance(val, ssl._ASN1Object) 643 self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected) 644 self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'), 645 expected) 646 with self.assertRaisesRegexp(ValueError, "unknown object 'serverauth'"): 647 ssl._ASN1Object.fromname('serverauth') 648 649 def test_purpose_enum(self): 650 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1') 651 self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object) 652 self.assertEqual(ssl.Purpose.SERVER_AUTH, val) 653 self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129) 654 self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth') 655 self.assertEqual(ssl.Purpose.SERVER_AUTH.oid, 656 '1.3.6.1.5.5.7.3.1') 657 658 val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2') 659 self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object) 660 self.assertEqual(ssl.Purpose.CLIENT_AUTH, val) 661 self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130) 662 self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth') 663 self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid, 664 '1.3.6.1.5.5.7.3.2') 665 666 def test_unsupported_dtls(self): 667 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 668 self.addCleanup(s.close) 669 with self.assertRaises(NotImplementedError) as cx: 670 ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE) 671 self.assertEqual(str(cx.exception), "only stream sockets are supported") 672 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 673 with self.assertRaises(NotImplementedError) as cx: 674 ctx.wrap_socket(s) 675 self.assertEqual(str(cx.exception), "only stream sockets are supported") 676 677 def cert_time_ok(self, timestring, timestamp): 678 self.assertEqual(ssl.cert_time_to_seconds(timestring), timestamp) 679 680 def cert_time_fail(self, timestring): 681 with self.assertRaises(ValueError): 682 ssl.cert_time_to_seconds(timestring) 683 684 @unittest.skipUnless(utc_offset(), 685 'local time needs to be different from UTC') 686 def test_cert_time_to_seconds_timezone(self): 687 # Issue #19940: ssl.cert_time_to_seconds() returns wrong 688 # results if local timezone is not UTC 689 self.cert_time_ok("May 9 00:00:00 2007 GMT", 1178668800.0) 690 self.cert_time_ok("Jan 5 09:34:43 2018 GMT", 1515144883.0) 691 692 def test_cert_time_to_seconds(self): 693 timestring = "Jan 5 09:34:43 2018 GMT" 694 ts = 1515144883.0 695 self.cert_time_ok(timestring, ts) 696 # accept keyword parameter, assert its name 697 self.assertEqual(ssl.cert_time_to_seconds(cert_time=timestring), ts) 698 # accept both %e and %d (space or zero generated by strftime) 699 self.cert_time_ok("Jan 05 09:34:43 2018 GMT", ts) 700 # case-insensitive 701 self.cert_time_ok("JaN 5 09:34:43 2018 GmT", ts) 702 self.cert_time_fail("Jan 5 09:34 2018 GMT") # no seconds 703 self.cert_time_fail("Jan 5 09:34:43 2018") # no GMT 704 self.cert_time_fail("Jan 5 09:34:43 2018 UTC") # not GMT timezone 705 self.cert_time_fail("Jan 35 09:34:43 2018 GMT") # invalid day 706 self.cert_time_fail("Jon 5 09:34:43 2018 GMT") # invalid month 707 self.cert_time_fail("Jan 5 24:00:00 2018 GMT") # invalid hour 708 self.cert_time_fail("Jan 5 09:60:43 2018 GMT") # invalid minute 709 710 newyear_ts = 1230768000.0 711 # leap seconds 712 self.cert_time_ok("Dec 31 23:59:60 2008 GMT", newyear_ts) 713 # same timestamp 714 self.cert_time_ok("Jan 1 00:00:00 2009 GMT", newyear_ts) 715 716 self.cert_time_ok("Jan 5 09:34:59 2018 GMT", 1515144899) 717 # allow 60th second (even if it is not a leap second) 718 self.cert_time_ok("Jan 5 09:34:60 2018 GMT", 1515144900) 719 # allow 2nd leap second for compatibility with time.strptime() 720 self.cert_time_ok("Jan 5 09:34:61 2018 GMT", 1515144901) 721 self.cert_time_fail("Jan 5 09:34:62 2018 GMT") # invalid seconds 722 723 # no special treatement for the special value: 724 # 99991231235959Z (rfc 5280) 725 self.cert_time_ok("Dec 31 23:59:59 9999 GMT", 253402300799.0) 726 727 @support.run_with_locale('LC_ALL', '') 728 def test_cert_time_to_seconds_locale(self): 729 # `cert_time_to_seconds()` should be locale independent 730 731 def local_february_name(): 732 return time.strftime('%b', (1, 2, 3, 4, 5, 6, 0, 0, 0)) 733 734 if local_february_name().lower() == 'feb': 735 self.skipTest("locale-specific month name needs to be " 736 "different from C locale") 737 738 # locale-independent 739 self.cert_time_ok("Feb 9 00:00:00 2007 GMT", 1170979200.0) 740 self.cert_time_fail(local_february_name() + " 9 00:00:00 2007 GMT") 741 742 743class ContextTests(unittest.TestCase): 744 745 @skip_if_broken_ubuntu_ssl 746 def test_constructor(self): 747 for protocol in PROTOCOLS: 748 ssl.SSLContext(protocol) 749 self.assertRaises(TypeError, ssl.SSLContext) 750 self.assertRaises(ValueError, ssl.SSLContext, -1) 751 self.assertRaises(ValueError, ssl.SSLContext, 42) 752 753 @skip_if_broken_ubuntu_ssl 754 def test_protocol(self): 755 for proto in PROTOCOLS: 756 ctx = ssl.SSLContext(proto) 757 self.assertEqual(ctx.protocol, proto) 758 759 def test_ciphers(self): 760 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 761 ctx.set_ciphers("ALL") 762 ctx.set_ciphers("DEFAULT") 763 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"): 764 ctx.set_ciphers("^$:,;?*'dorothyx") 765 766 @skip_if_broken_ubuntu_ssl 767 def test_options(self): 768 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 769 # OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value 770 default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3) 771 if not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0): 772 default |= ssl.OP_NO_COMPRESSION 773 self.assertEqual(default, ctx.options) 774 ctx.options |= ssl.OP_NO_TLSv1 775 self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options) 776 if can_clear_options(): 777 ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1) 778 self.assertEqual(default, ctx.options) 779 ctx.options = 0 780 self.assertEqual(0, ctx.options) 781 else: 782 with self.assertRaises(ValueError): 783 ctx.options = 0 784 785 def test_verify_mode(self): 786 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 787 # Default value 788 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 789 ctx.verify_mode = ssl.CERT_OPTIONAL 790 self.assertEqual(ctx.verify_mode, ssl.CERT_OPTIONAL) 791 ctx.verify_mode = ssl.CERT_REQUIRED 792 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 793 ctx.verify_mode = ssl.CERT_NONE 794 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 795 with self.assertRaises(TypeError): 796 ctx.verify_mode = None 797 with self.assertRaises(ValueError): 798 ctx.verify_mode = 42 799 800 @unittest.skipUnless(have_verify_flags(), 801 "verify_flags need OpenSSL > 0.9.8") 802 def test_verify_flags(self): 803 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 804 # default value 805 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0) 806 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT | tf) 807 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF 808 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_LEAF) 809 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_CHAIN 810 self.assertEqual(ctx.verify_flags, ssl.VERIFY_CRL_CHECK_CHAIN) 811 ctx.verify_flags = ssl.VERIFY_DEFAULT 812 self.assertEqual(ctx.verify_flags, ssl.VERIFY_DEFAULT) 813 # supports any value 814 ctx.verify_flags = ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT 815 self.assertEqual(ctx.verify_flags, 816 ssl.VERIFY_CRL_CHECK_LEAF | ssl.VERIFY_X509_STRICT) 817 with self.assertRaises(TypeError): 818 ctx.verify_flags = None 819 820 def test_load_cert_chain(self): 821 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 822 # Combined key and cert in a single file 823 ctx.load_cert_chain(CERTFILE, keyfile=None) 824 ctx.load_cert_chain(CERTFILE, keyfile=CERTFILE) 825 self.assertRaises(TypeError, ctx.load_cert_chain, keyfile=CERTFILE) 826 with self.assertRaises(IOError) as cm: 827 ctx.load_cert_chain(NONEXISTINGCERT) 828 self.assertEqual(cm.exception.errno, errno.ENOENT) 829 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"): 830 ctx.load_cert_chain(BADCERT) 831 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"): 832 ctx.load_cert_chain(EMPTYCERT) 833 # Separate key and cert 834 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 835 ctx.load_cert_chain(ONLYCERT, ONLYKEY) 836 ctx.load_cert_chain(certfile=ONLYCERT, keyfile=ONLYKEY) 837 ctx.load_cert_chain(certfile=BYTES_ONLYCERT, keyfile=BYTES_ONLYKEY) 838 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"): 839 ctx.load_cert_chain(ONLYCERT) 840 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"): 841 ctx.load_cert_chain(ONLYKEY) 842 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"): 843 ctx.load_cert_chain(certfile=ONLYKEY, keyfile=ONLYCERT) 844 # Mismatching key and cert 845 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 846 with self.assertRaisesRegexp(ssl.SSLError, "key values mismatch"): 847 ctx.load_cert_chain(CAFILE_CACERT, ONLYKEY) 848 # Password protected key and cert 849 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD) 850 ctx.load_cert_chain(CERTFILE_PROTECTED, password=KEY_PASSWORD.encode()) 851 ctx.load_cert_chain(CERTFILE_PROTECTED, 852 password=bytearray(KEY_PASSWORD.encode())) 853 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD) 854 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, KEY_PASSWORD.encode()) 855 ctx.load_cert_chain(ONLYCERT, ONLYKEY_PROTECTED, 856 bytearray(KEY_PASSWORD.encode())) 857 with self.assertRaisesRegexp(TypeError, "should be a string"): 858 ctx.load_cert_chain(CERTFILE_PROTECTED, password=True) 859 with self.assertRaises(ssl.SSLError): 860 ctx.load_cert_chain(CERTFILE_PROTECTED, password="badpass") 861 with self.assertRaisesRegexp(ValueError, "cannot be longer"): 862 # openssl has a fixed limit on the password buffer. 863 # PEM_BUFSIZE is generally set to 1kb. 864 # Return a string larger than this. 865 ctx.load_cert_chain(CERTFILE_PROTECTED, password=b'a' * 102400) 866 # Password callback 867 def getpass_unicode(): 868 return KEY_PASSWORD 869 def getpass_bytes(): 870 return KEY_PASSWORD.encode() 871 def getpass_bytearray(): 872 return bytearray(KEY_PASSWORD.encode()) 873 def getpass_badpass(): 874 return "badpass" 875 def getpass_huge(): 876 return b'a' * (1024 * 1024) 877 def getpass_bad_type(): 878 return 9 879 def getpass_exception(): 880 raise Exception('getpass error') 881 class GetPassCallable: 882 def __call__(self): 883 return KEY_PASSWORD 884 def getpass(self): 885 return KEY_PASSWORD 886 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_unicode) 887 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytes) 888 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bytearray) 889 ctx.load_cert_chain(CERTFILE_PROTECTED, password=GetPassCallable()) 890 ctx.load_cert_chain(CERTFILE_PROTECTED, 891 password=GetPassCallable().getpass) 892 with self.assertRaises(ssl.SSLError): 893 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_badpass) 894 with self.assertRaisesRegexp(ValueError, "cannot be longer"): 895 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_huge) 896 with self.assertRaisesRegexp(TypeError, "must return a string"): 897 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_bad_type) 898 with self.assertRaisesRegexp(Exception, "getpass error"): 899 ctx.load_cert_chain(CERTFILE_PROTECTED, password=getpass_exception) 900 # Make sure the password function isn't called if it isn't needed 901 ctx.load_cert_chain(CERTFILE, password=getpass_exception) 902 903 def test_load_verify_locations(self): 904 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 905 ctx.load_verify_locations(CERTFILE) 906 ctx.load_verify_locations(cafile=CERTFILE, capath=None) 907 ctx.load_verify_locations(BYTES_CERTFILE) 908 ctx.load_verify_locations(cafile=BYTES_CERTFILE, capath=None) 909 ctx.load_verify_locations(cafile=BYTES_CERTFILE.decode('utf-8')) 910 self.assertRaises(TypeError, ctx.load_verify_locations) 911 self.assertRaises(TypeError, ctx.load_verify_locations, None, None, None) 912 with self.assertRaises(IOError) as cm: 913 ctx.load_verify_locations(NONEXISTINGCERT) 914 self.assertEqual(cm.exception.errno, errno.ENOENT) 915 with self.assertRaises(IOError): 916 ctx.load_verify_locations(u'') 917 with self.assertRaisesRegexp(ssl.SSLError, "PEM lib"): 918 ctx.load_verify_locations(BADCERT) 919 ctx.load_verify_locations(CERTFILE, CAPATH) 920 ctx.load_verify_locations(CERTFILE, capath=BYTES_CAPATH) 921 922 # Issue #10989: crash if the second argument type is invalid 923 self.assertRaises(TypeError, ctx.load_verify_locations, None, True) 924 925 def test_load_verify_cadata(self): 926 # test cadata 927 with open(CAFILE_CACERT) as f: 928 cacert_pem = f.read().decode("ascii") 929 cacert_der = ssl.PEM_cert_to_DER_cert(cacert_pem) 930 with open(CAFILE_NEURONIO) as f: 931 neuronio_pem = f.read().decode("ascii") 932 neuronio_der = ssl.PEM_cert_to_DER_cert(neuronio_pem) 933 934 # test PEM 935 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 936 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 0) 937 ctx.load_verify_locations(cadata=cacert_pem) 938 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 1) 939 ctx.load_verify_locations(cadata=neuronio_pem) 940 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 941 # cert already in hash table 942 ctx.load_verify_locations(cadata=neuronio_pem) 943 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 944 945 # combined 946 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 947 combined = "\n".join((cacert_pem, neuronio_pem)) 948 ctx.load_verify_locations(cadata=combined) 949 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 950 951 # with junk around the certs 952 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 953 combined = ["head", cacert_pem, "other", neuronio_pem, "again", 954 neuronio_pem, "tail"] 955 ctx.load_verify_locations(cadata="\n".join(combined)) 956 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 957 958 # test DER 959 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 960 ctx.load_verify_locations(cadata=cacert_der) 961 ctx.load_verify_locations(cadata=neuronio_der) 962 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 963 # cert already in hash table 964 ctx.load_verify_locations(cadata=cacert_der) 965 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 966 967 # combined 968 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 969 combined = b"".join((cacert_der, neuronio_der)) 970 ctx.load_verify_locations(cadata=combined) 971 self.assertEqual(ctx.cert_store_stats()["x509_ca"], 2) 972 973 # error cases 974 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 975 self.assertRaises(TypeError, ctx.load_verify_locations, cadata=object) 976 977 with self.assertRaisesRegexp(ssl.SSLError, "no start line"): 978 ctx.load_verify_locations(cadata=u"broken") 979 with self.assertRaisesRegexp(ssl.SSLError, "not enough data"): 980 ctx.load_verify_locations(cadata=b"broken") 981 982 983 def test_load_dh_params(self): 984 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 985 ctx.load_dh_params(DHFILE) 986 if os.name != 'nt': 987 ctx.load_dh_params(BYTES_DHFILE) 988 self.assertRaises(TypeError, ctx.load_dh_params) 989 self.assertRaises(TypeError, ctx.load_dh_params, None) 990 with self.assertRaises(IOError) as cm: 991 ctx.load_dh_params(NONEXISTINGCERT) 992 self.assertEqual(cm.exception.errno, errno.ENOENT) 993 with self.assertRaises(ssl.SSLError) as cm: 994 ctx.load_dh_params(CERTFILE) 995 996 @skip_if_broken_ubuntu_ssl 997 def test_session_stats(self): 998 for proto in PROTOCOLS: 999 ctx = ssl.SSLContext(proto) 1000 self.assertEqual(ctx.session_stats(), { 1001 'number': 0, 1002 'connect': 0, 1003 'connect_good': 0, 1004 'connect_renegotiate': 0, 1005 'accept': 0, 1006 'accept_good': 0, 1007 'accept_renegotiate': 0, 1008 'hits': 0, 1009 'misses': 0, 1010 'timeouts': 0, 1011 'cache_full': 0, 1012 }) 1013 1014 def test_set_default_verify_paths(self): 1015 # There's not much we can do to test that it acts as expected, 1016 # so just check it doesn't crash or raise an exception. 1017 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1018 ctx.set_default_verify_paths() 1019 1020 @unittest.skipUnless(ssl.HAS_ECDH, "ECDH disabled on this OpenSSL build") 1021 def test_set_ecdh_curve(self): 1022 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1023 ctx.set_ecdh_curve("prime256v1") 1024 ctx.set_ecdh_curve(b"prime256v1") 1025 self.assertRaises(TypeError, ctx.set_ecdh_curve) 1026 self.assertRaises(TypeError, ctx.set_ecdh_curve, None) 1027 self.assertRaises(ValueError, ctx.set_ecdh_curve, "foo") 1028 self.assertRaises(ValueError, ctx.set_ecdh_curve, b"foo") 1029 1030 @needs_sni 1031 def test_sni_callback(self): 1032 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1033 1034 # set_servername_callback expects a callable, or None 1035 self.assertRaises(TypeError, ctx.set_servername_callback) 1036 self.assertRaises(TypeError, ctx.set_servername_callback, 4) 1037 self.assertRaises(TypeError, ctx.set_servername_callback, "") 1038 self.assertRaises(TypeError, ctx.set_servername_callback, ctx) 1039 1040 def dummycallback(sock, servername, ctx): 1041 pass 1042 ctx.set_servername_callback(None) 1043 ctx.set_servername_callback(dummycallback) 1044 1045 @needs_sni 1046 def test_sni_callback_refcycle(self): 1047 # Reference cycles through the servername callback are detected 1048 # and cleared. 1049 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1050 def dummycallback(sock, servername, ctx, cycle=ctx): 1051 pass 1052 ctx.set_servername_callback(dummycallback) 1053 wr = weakref.ref(ctx) 1054 del ctx, dummycallback 1055 gc.collect() 1056 self.assertIs(wr(), None) 1057 1058 def test_cert_store_stats(self): 1059 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1060 self.assertEqual(ctx.cert_store_stats(), 1061 {'x509_ca': 0, 'crl': 0, 'x509': 0}) 1062 ctx.load_cert_chain(CERTFILE) 1063 self.assertEqual(ctx.cert_store_stats(), 1064 {'x509_ca': 0, 'crl': 0, 'x509': 0}) 1065 ctx.load_verify_locations(CERTFILE) 1066 self.assertEqual(ctx.cert_store_stats(), 1067 {'x509_ca': 0, 'crl': 0, 'x509': 1}) 1068 ctx.load_verify_locations(CAFILE_CACERT) 1069 self.assertEqual(ctx.cert_store_stats(), 1070 {'x509_ca': 1, 'crl': 0, 'x509': 2}) 1071 1072 def test_get_ca_certs(self): 1073 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1074 self.assertEqual(ctx.get_ca_certs(), []) 1075 # CERTFILE is not flagged as X509v3 Basic Constraints: CA:TRUE 1076 ctx.load_verify_locations(CERTFILE) 1077 self.assertEqual(ctx.get_ca_certs(), []) 1078 # but CAFILE_CACERT is a CA cert 1079 ctx.load_verify_locations(CAFILE_CACERT) 1080 self.assertEqual(ctx.get_ca_certs(), 1081 [{'issuer': ((('organizationName', 'Root CA'),), 1082 (('organizationalUnitName', 'http://www.cacert.org'),), 1083 (('commonName', 'CA Cert Signing Authority'),), 1084 (('emailAddress', 'support@cacert.org'),)), 1085 'notAfter': asn1time('Mar 29 12:29:49 2033 GMT'), 1086 'notBefore': asn1time('Mar 30 12:29:49 2003 GMT'), 1087 'serialNumber': '00', 1088 'crlDistributionPoints': ('https://www.cacert.org/revoke.crl',), 1089 'subject': ((('organizationName', 'Root CA'),), 1090 (('organizationalUnitName', 'http://www.cacert.org'),), 1091 (('commonName', 'CA Cert Signing Authority'),), 1092 (('emailAddress', 'support@cacert.org'),)), 1093 'version': 3}]) 1094 1095 with open(CAFILE_CACERT) as f: 1096 pem = f.read() 1097 der = ssl.PEM_cert_to_DER_cert(pem) 1098 self.assertEqual(ctx.get_ca_certs(True), [der]) 1099 1100 def test_load_default_certs(self): 1101 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1102 ctx.load_default_certs() 1103 1104 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1105 ctx.load_default_certs(ssl.Purpose.SERVER_AUTH) 1106 ctx.load_default_certs() 1107 1108 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1109 ctx.load_default_certs(ssl.Purpose.CLIENT_AUTH) 1110 1111 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1112 self.assertRaises(TypeError, ctx.load_default_certs, None) 1113 self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH') 1114 1115 @unittest.skipIf(sys.platform == "win32", "not-Windows specific") 1116 @unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars") 1117 def test_load_default_certs_env(self): 1118 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1119 with support.EnvironmentVarGuard() as env: 1120 env["SSL_CERT_DIR"] = CAPATH 1121 env["SSL_CERT_FILE"] = CERTFILE 1122 ctx.load_default_certs() 1123 self.assertEqual(ctx.cert_store_stats(), {"crl": 0, "x509": 1, "x509_ca": 0}) 1124 1125 @unittest.skipUnless(sys.platform == "win32", "Windows specific") 1126 def test_load_default_certs_env_windows(self): 1127 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1128 ctx.load_default_certs() 1129 stats = ctx.cert_store_stats() 1130 1131 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1132 with support.EnvironmentVarGuard() as env: 1133 env["SSL_CERT_DIR"] = CAPATH 1134 env["SSL_CERT_FILE"] = CERTFILE 1135 ctx.load_default_certs() 1136 stats["x509"] += 1 1137 self.assertEqual(ctx.cert_store_stats(), stats) 1138 1139 def test_create_default_context(self): 1140 ctx = ssl.create_default_context() 1141 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) 1142 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 1143 self.assertTrue(ctx.check_hostname) 1144 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1145 self.assertEqual( 1146 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0), 1147 getattr(ssl, "OP_NO_COMPRESSION", 0), 1148 ) 1149 1150 with open(SIGNING_CA) as f: 1151 cadata = f.read().decode("ascii") 1152 ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH, 1153 cadata=cadata) 1154 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) 1155 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 1156 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1157 self.assertEqual( 1158 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0), 1159 getattr(ssl, "OP_NO_COMPRESSION", 0), 1160 ) 1161 1162 ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) 1163 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) 1164 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 1165 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1166 self.assertEqual( 1167 ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0), 1168 getattr(ssl, "OP_NO_COMPRESSION", 0), 1169 ) 1170 self.assertEqual( 1171 ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0), 1172 getattr(ssl, "OP_SINGLE_DH_USE", 0), 1173 ) 1174 self.assertEqual( 1175 ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0), 1176 getattr(ssl, "OP_SINGLE_ECDH_USE", 0), 1177 ) 1178 1179 def test__create_stdlib_context(self): 1180 ctx = ssl._create_stdlib_context() 1181 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) 1182 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 1183 self.assertFalse(ctx.check_hostname) 1184 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1185 1186 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1) 1187 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1) 1188 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 1189 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1190 1191 ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1, 1192 cert_reqs=ssl.CERT_REQUIRED, 1193 check_hostname=True) 1194 self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1) 1195 self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) 1196 self.assertTrue(ctx.check_hostname) 1197 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1198 1199 ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH) 1200 self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23) 1201 self.assertEqual(ctx.verify_mode, ssl.CERT_NONE) 1202 self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2) 1203 1204 def test__https_verify_certificates(self): 1205 # Unit test to check the contect factory mapping 1206 # The factories themselves are tested above 1207 # This test will fail by design if run under PYTHONHTTPSVERIFY=0 1208 # (as will various test_httplib tests) 1209 1210 # Uses a fresh SSL module to avoid affecting the real one 1211 local_ssl = support.import_fresh_module("ssl") 1212 # Certificate verification is enabled by default 1213 self.assertIs(local_ssl._create_default_https_context, 1214 local_ssl.create_default_context) 1215 # Turn default verification off 1216 local_ssl._https_verify_certificates(enable=False) 1217 self.assertIs(local_ssl._create_default_https_context, 1218 local_ssl._create_unverified_context) 1219 # And back on 1220 local_ssl._https_verify_certificates(enable=True) 1221 self.assertIs(local_ssl._create_default_https_context, 1222 local_ssl.create_default_context) 1223 # The default behaviour is to enable 1224 local_ssl._https_verify_certificates(enable=False) 1225 local_ssl._https_verify_certificates() 1226 self.assertIs(local_ssl._create_default_https_context, 1227 local_ssl.create_default_context) 1228 1229 def test__https_verify_envvar(self): 1230 # Unit test to check the PYTHONHTTPSVERIFY handling 1231 # Need to use a subprocess so it can still be run under -E 1232 https_is_verified = """import ssl, sys; \ 1233 status = "Error: _create_default_https_context does not verify certs" \ 1234 if ssl._create_default_https_context is \ 1235 ssl._create_unverified_context \ 1236 else None; \ 1237 sys.exit(status)""" 1238 https_is_not_verified = """import ssl, sys; \ 1239 status = "Error: _create_default_https_context verifies certs" \ 1240 if ssl._create_default_https_context is \ 1241 ssl.create_default_context \ 1242 else None; \ 1243 sys.exit(status)""" 1244 extra_env = {} 1245 # Omitting it leaves verification on 1246 assert_python_ok("-c", https_is_verified, **extra_env) 1247 # Setting it to zero turns verification off 1248 extra_env[ssl._https_verify_envvar] = "0" 1249 assert_python_ok("-c", https_is_not_verified, **extra_env) 1250 # Any other value should also leave it on 1251 for setting in ("", "1", "enabled", "foo"): 1252 extra_env[ssl._https_verify_envvar] = setting 1253 assert_python_ok("-c", https_is_verified, **extra_env) 1254 1255 def test_check_hostname(self): 1256 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1257 self.assertFalse(ctx.check_hostname) 1258 1259 # Requires CERT_REQUIRED or CERT_OPTIONAL 1260 with self.assertRaises(ValueError): 1261 ctx.check_hostname = True 1262 ctx.verify_mode = ssl.CERT_REQUIRED 1263 self.assertFalse(ctx.check_hostname) 1264 ctx.check_hostname = True 1265 self.assertTrue(ctx.check_hostname) 1266 1267 ctx.verify_mode = ssl.CERT_OPTIONAL 1268 ctx.check_hostname = True 1269 self.assertTrue(ctx.check_hostname) 1270 1271 # Cannot set CERT_NONE with check_hostname enabled 1272 with self.assertRaises(ValueError): 1273 ctx.verify_mode = ssl.CERT_NONE 1274 ctx.check_hostname = False 1275 self.assertFalse(ctx.check_hostname) 1276 1277 1278class SSLErrorTests(unittest.TestCase): 1279 1280 def test_str(self): 1281 # The str() of a SSLError doesn't include the errno 1282 e = ssl.SSLError(1, "foo") 1283 self.assertEqual(str(e), "foo") 1284 self.assertEqual(e.errno, 1) 1285 # Same for a subclass 1286 e = ssl.SSLZeroReturnError(1, "foo") 1287 self.assertEqual(str(e), "foo") 1288 self.assertEqual(e.errno, 1) 1289 1290 def test_lib_reason(self): 1291 # Test the library and reason attributes 1292 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1293 with self.assertRaises(ssl.SSLError) as cm: 1294 ctx.load_dh_params(CERTFILE) 1295 self.assertEqual(cm.exception.library, 'PEM') 1296 self.assertEqual(cm.exception.reason, 'NO_START_LINE') 1297 s = str(cm.exception) 1298 self.assertTrue(s.startswith("[PEM: NO_START_LINE] no start line"), s) 1299 1300 def test_subclass(self): 1301 # Check that the appropriate SSLError subclass is raised 1302 # (this only tests one of them) 1303 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1304 with closing(socket.socket()) as s: 1305 s.bind(("127.0.0.1", 0)) 1306 s.listen(5) 1307 c = socket.socket() 1308 c.connect(s.getsockname()) 1309 c.setblocking(False) 1310 with closing(ctx.wrap_socket(c, False, do_handshake_on_connect=False)) as c: 1311 with self.assertRaises(ssl.SSLWantReadError) as cm: 1312 c.do_handshake() 1313 s = str(cm.exception) 1314 self.assertTrue(s.startswith("The operation did not complete (read)"), s) 1315 # For compatibility 1316 self.assertEqual(cm.exception.errno, ssl.SSL_ERROR_WANT_READ) 1317 1318 1319class NetworkedTests(unittest.TestCase): 1320 1321 def test_connect(self): 1322 with support.transient_internet(REMOTE_HOST): 1323 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1324 cert_reqs=ssl.CERT_NONE) 1325 try: 1326 s.connect((REMOTE_HOST, 443)) 1327 self.assertEqual({}, s.getpeercert()) 1328 finally: 1329 s.close() 1330 1331 # this should fail because we have no verification certs 1332 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1333 cert_reqs=ssl.CERT_REQUIRED) 1334 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed", 1335 s.connect, (REMOTE_HOST, 443)) 1336 s.close() 1337 1338 # this should succeed because we specify the root cert 1339 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1340 cert_reqs=ssl.CERT_REQUIRED, 1341 ca_certs=REMOTE_ROOT_CERT) 1342 try: 1343 s.connect((REMOTE_HOST, 443)) 1344 self.assertTrue(s.getpeercert()) 1345 finally: 1346 s.close() 1347 1348 def test_connect_ex(self): 1349 # Issue #11326: check connect_ex() implementation 1350 with support.transient_internet(REMOTE_HOST): 1351 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1352 cert_reqs=ssl.CERT_REQUIRED, 1353 ca_certs=REMOTE_ROOT_CERT) 1354 try: 1355 self.assertEqual(0, s.connect_ex((REMOTE_HOST, 443))) 1356 self.assertTrue(s.getpeercert()) 1357 finally: 1358 s.close() 1359 1360 def test_non_blocking_connect_ex(self): 1361 # Issue #11326: non-blocking connect_ex() should allow handshake 1362 # to proceed after the socket gets ready. 1363 with support.transient_internet(REMOTE_HOST): 1364 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1365 cert_reqs=ssl.CERT_REQUIRED, 1366 ca_certs=REMOTE_ROOT_CERT, 1367 do_handshake_on_connect=False) 1368 try: 1369 s.setblocking(False) 1370 rc = s.connect_ex((REMOTE_HOST, 443)) 1371 # EWOULDBLOCK under Windows, EINPROGRESS elsewhere 1372 self.assertIn(rc, (0, errno.EINPROGRESS, errno.EWOULDBLOCK)) 1373 # Wait for connect to finish 1374 select.select([], [s], [], 5.0) 1375 # Non-blocking handshake 1376 while True: 1377 try: 1378 s.do_handshake() 1379 break 1380 except ssl.SSLWantReadError: 1381 select.select([s], [], [], 5.0) 1382 except ssl.SSLWantWriteError: 1383 select.select([], [s], [], 5.0) 1384 # SSL established 1385 self.assertTrue(s.getpeercert()) 1386 finally: 1387 s.close() 1388 1389 def test_timeout_connect_ex(self): 1390 # Issue #12065: on a timeout, connect_ex() should return the original 1391 # errno (mimicking the behaviour of non-SSL sockets). 1392 with support.transient_internet(REMOTE_HOST): 1393 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1394 cert_reqs=ssl.CERT_REQUIRED, 1395 ca_certs=REMOTE_ROOT_CERT, 1396 do_handshake_on_connect=False) 1397 try: 1398 s.settimeout(0.0000001) 1399 rc = s.connect_ex((REMOTE_HOST, 443)) 1400 if rc == 0: 1401 self.skipTest("REMOTE_HOST responded too quickly") 1402 self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK)) 1403 finally: 1404 s.close() 1405 1406 def test_connect_ex_error(self): 1407 with support.transient_internet(REMOTE_HOST): 1408 s = ssl.wrap_socket(socket.socket(socket.AF_INET), 1409 cert_reqs=ssl.CERT_REQUIRED, 1410 ca_certs=REMOTE_ROOT_CERT) 1411 try: 1412 rc = s.connect_ex((REMOTE_HOST, 444)) 1413 # Issue #19919: Windows machines or VMs hosted on Windows 1414 # machines sometimes return EWOULDBLOCK. 1415 errors = ( 1416 errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.ETIMEDOUT, 1417 errno.EWOULDBLOCK, 1418 ) 1419 self.assertIn(rc, errors) 1420 finally: 1421 s.close() 1422 1423 def test_connect_with_context(self): 1424 with support.transient_internet(REMOTE_HOST): 1425 # Same as test_connect, but with a separately created context 1426 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1427 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1428 s.connect((REMOTE_HOST, 443)) 1429 try: 1430 self.assertEqual({}, s.getpeercert()) 1431 finally: 1432 s.close() 1433 # Same with a server hostname 1434 s = ctx.wrap_socket(socket.socket(socket.AF_INET), 1435 server_hostname=REMOTE_HOST) 1436 s.connect((REMOTE_HOST, 443)) 1437 s.close() 1438 # This should fail because we have no verification certs 1439 ctx.verify_mode = ssl.CERT_REQUIRED 1440 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1441 self.assertRaisesRegexp(ssl.SSLError, "certificate verify failed", 1442 s.connect, (REMOTE_HOST, 443)) 1443 s.close() 1444 # This should succeed because we specify the root cert 1445 ctx.load_verify_locations(REMOTE_ROOT_CERT) 1446 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1447 s.connect((REMOTE_HOST, 443)) 1448 try: 1449 cert = s.getpeercert() 1450 self.assertTrue(cert) 1451 finally: 1452 s.close() 1453 1454 def test_connect_capath(self): 1455 # Verify server certificates using the `capath` argument 1456 # NOTE: the subject hashing algorithm has been changed between 1457 # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must 1458 # contain both versions of each certificate (same content, different 1459 # filename) for this test to be portable across OpenSSL releases. 1460 with support.transient_internet(REMOTE_HOST): 1461 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1462 ctx.verify_mode = ssl.CERT_REQUIRED 1463 ctx.load_verify_locations(capath=CAPATH) 1464 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1465 s.connect((REMOTE_HOST, 443)) 1466 try: 1467 cert = s.getpeercert() 1468 self.assertTrue(cert) 1469 finally: 1470 s.close() 1471 # Same with a bytes `capath` argument 1472 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1473 ctx.verify_mode = ssl.CERT_REQUIRED 1474 ctx.load_verify_locations(capath=BYTES_CAPATH) 1475 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1476 s.connect((REMOTE_HOST, 443)) 1477 try: 1478 cert = s.getpeercert() 1479 self.assertTrue(cert) 1480 finally: 1481 s.close() 1482 1483 def test_connect_cadata(self): 1484 with open(REMOTE_ROOT_CERT) as f: 1485 pem = f.read().decode('ascii') 1486 der = ssl.PEM_cert_to_DER_cert(pem) 1487 with support.transient_internet(REMOTE_HOST): 1488 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1489 ctx.verify_mode = ssl.CERT_REQUIRED 1490 ctx.load_verify_locations(cadata=pem) 1491 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s: 1492 s.connect((REMOTE_HOST, 443)) 1493 cert = s.getpeercert() 1494 self.assertTrue(cert) 1495 1496 # same with DER 1497 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1498 ctx.verify_mode = ssl.CERT_REQUIRED 1499 ctx.load_verify_locations(cadata=der) 1500 with closing(ctx.wrap_socket(socket.socket(socket.AF_INET))) as s: 1501 s.connect((REMOTE_HOST, 443)) 1502 cert = s.getpeercert() 1503 self.assertTrue(cert) 1504 1505 @unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows") 1506 def test_makefile_close(self): 1507 # Issue #5238: creating a file-like object with makefile() shouldn't 1508 # delay closing the underlying "real socket" (here tested with its 1509 # file descriptor, hence skipping the test under Windows). 1510 with support.transient_internet(REMOTE_HOST): 1511 ss = ssl.wrap_socket(socket.socket(socket.AF_INET)) 1512 ss.connect((REMOTE_HOST, 443)) 1513 fd = ss.fileno() 1514 f = ss.makefile() 1515 f.close() 1516 # The fd is still open 1517 os.read(fd, 0) 1518 # Closing the SSL socket should close the fd too 1519 ss.close() 1520 gc.collect() 1521 with self.assertRaises(OSError) as e: 1522 os.read(fd, 0) 1523 self.assertEqual(e.exception.errno, errno.EBADF) 1524 1525 def test_non_blocking_handshake(self): 1526 with support.transient_internet(REMOTE_HOST): 1527 s = socket.socket(socket.AF_INET) 1528 s.connect((REMOTE_HOST, 443)) 1529 s.setblocking(False) 1530 s = ssl.wrap_socket(s, 1531 cert_reqs=ssl.CERT_NONE, 1532 do_handshake_on_connect=False) 1533 count = 0 1534 while True: 1535 try: 1536 count += 1 1537 s.do_handshake() 1538 break 1539 except ssl.SSLWantReadError: 1540 select.select([s], [], []) 1541 except ssl.SSLWantWriteError: 1542 select.select([], [s], []) 1543 s.close() 1544 if support.verbose: 1545 sys.stdout.write("\nNeeded %d calls to do_handshake() to establish session.\n" % count) 1546 1547 def test_get_server_certificate(self): 1548 def _test_get_server_certificate(host, port, cert=None): 1549 with support.transient_internet(host): 1550 pem = ssl.get_server_certificate((host, port)) 1551 if not pem: 1552 self.fail("No server certificate on %s:%s!" % (host, port)) 1553 1554 try: 1555 pem = ssl.get_server_certificate((host, port), 1556 ca_certs=CERTFILE) 1557 except ssl.SSLError as x: 1558 #should fail 1559 if support.verbose: 1560 sys.stdout.write("%s\n" % x) 1561 else: 1562 self.fail("Got server certificate %s for %s:%s!" % (pem, host, port)) 1563 pem = ssl.get_server_certificate((host, port), 1564 ca_certs=cert) 1565 if not pem: 1566 self.fail("No server certificate on %s:%s!" % (host, port)) 1567 if support.verbose: 1568 sys.stdout.write("\nVerified certificate for %s:%s is\n%s\n" % (host, port ,pem)) 1569 1570 _test_get_server_certificate(REMOTE_HOST, 443, REMOTE_ROOT_CERT) 1571 if support.IPV6_ENABLED: 1572 _test_get_server_certificate('ipv6.google.com', 443) 1573 1574 def test_ciphers(self): 1575 remote = (REMOTE_HOST, 443) 1576 with support.transient_internet(remote[0]): 1577 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET), 1578 cert_reqs=ssl.CERT_NONE, ciphers="ALL")) as s: 1579 s.connect(remote) 1580 with closing(ssl.wrap_socket(socket.socket(socket.AF_INET), 1581 cert_reqs=ssl.CERT_NONE, ciphers="DEFAULT")) as s: 1582 s.connect(remote) 1583 # Error checking can happen at instantiation or when connecting 1584 with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"): 1585 with closing(socket.socket(socket.AF_INET)) as sock: 1586 s = ssl.wrap_socket(sock, 1587 cert_reqs=ssl.CERT_NONE, ciphers="^$:,;?*'dorothyx") 1588 s.connect(remote) 1589 1590 def test_algorithms(self): 1591 # Issue #8484: all algorithms should be available when verifying a 1592 # certificate. 1593 # SHA256 was added in OpenSSL 0.9.8 1594 if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15): 1595 self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION) 1596 # sha256.tbs-internet.com needs SNI to use the correct certificate 1597 if not ssl.HAS_SNI: 1598 self.skipTest("SNI needed for this test") 1599 # https://sha2.hboeck.de/ was used until 2011-01-08 (no route to host) 1600 remote = ("sha256.tbs-internet.com", 443) 1601 sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem") 1602 with support.transient_internet("sha256.tbs-internet.com"): 1603 ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1604 ctx.verify_mode = ssl.CERT_REQUIRED 1605 ctx.load_verify_locations(sha256_cert) 1606 s = ctx.wrap_socket(socket.socket(socket.AF_INET), 1607 server_hostname="sha256.tbs-internet.com") 1608 try: 1609 s.connect(remote) 1610 if support.verbose: 1611 sys.stdout.write("\nCipher with %r is %r\n" % 1612 (remote, s.cipher())) 1613 sys.stdout.write("Certificate is:\n%s\n" % 1614 pprint.pformat(s.getpeercert())) 1615 finally: 1616 s.close() 1617 1618 def test_get_ca_certs_capath(self): 1619 # capath certs are loaded on request 1620 with support.transient_internet(REMOTE_HOST): 1621 ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1622 ctx.verify_mode = ssl.CERT_REQUIRED 1623 ctx.load_verify_locations(capath=CAPATH) 1624 self.assertEqual(ctx.get_ca_certs(), []) 1625 s = ctx.wrap_socket(socket.socket(socket.AF_INET)) 1626 s.connect((REMOTE_HOST, 443)) 1627 try: 1628 cert = s.getpeercert() 1629 self.assertTrue(cert) 1630 finally: 1631 s.close() 1632 self.assertEqual(len(ctx.get_ca_certs()), 1) 1633 1634 @needs_sni 1635 def test_context_setget(self): 1636 # Check that the context of a connected socket can be replaced. 1637 with support.transient_internet(REMOTE_HOST): 1638 ctx1 = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 1639 ctx2 = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 1640 s = socket.socket(socket.AF_INET) 1641 with closing(ctx1.wrap_socket(s)) as ss: 1642 ss.connect((REMOTE_HOST, 443)) 1643 self.assertIs(ss.context, ctx1) 1644 self.assertIs(ss._sslobj.context, ctx1) 1645 ss.context = ctx2 1646 self.assertIs(ss.context, ctx2) 1647 self.assertIs(ss._sslobj.context, ctx2) 1648 1649try: 1650 import threading 1651except ImportError: 1652 _have_threads = False 1653else: 1654 _have_threads = True 1655 1656 from test.ssl_servers import make_https_server 1657 1658 class ThreadedEchoServer(threading.Thread): 1659 1660 class ConnectionHandler(threading.Thread): 1661 1662 """A mildly complicated class, because we want it to work both 1663 with and without the SSL wrapper around the socket connection, so 1664 that we can test the STARTTLS functionality.""" 1665 1666 def __init__(self, server, connsock, addr): 1667 self.server = server 1668 self.running = False 1669 self.sock = connsock 1670 self.addr = addr 1671 self.sock.setblocking(1) 1672 self.sslconn = None 1673 threading.Thread.__init__(self) 1674 self.daemon = True 1675 1676 def wrap_conn(self): 1677 try: 1678 self.sslconn = self.server.context.wrap_socket( 1679 self.sock, server_side=True) 1680 self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol()) 1681 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol()) 1682 except socket.error as e: 1683 # We treat ConnectionResetError as though it were an 1684 # SSLError - OpenSSL on Ubuntu abruptly closes the 1685 # connection when asked to use an unsupported protocol. 1686 # 1687 # XXX Various errors can have happened here, for example 1688 # a mismatching protocol version, an invalid certificate, 1689 # or a low-level bug. This should be made more discriminating. 1690 if not isinstance(e, ssl.SSLError) and e.errno != errno.ECONNRESET: 1691 raise 1692 self.server.conn_errors.append(e) 1693 if self.server.chatty: 1694 handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n") 1695 self.running = False 1696 self.server.stop() 1697 self.close() 1698 return False 1699 else: 1700 if self.server.context.verify_mode == ssl.CERT_REQUIRED: 1701 cert = self.sslconn.getpeercert() 1702 if support.verbose and self.server.chatty: 1703 sys.stdout.write(" client cert is " + pprint.pformat(cert) + "\n") 1704 cert_binary = self.sslconn.getpeercert(True) 1705 if support.verbose and self.server.chatty: 1706 sys.stdout.write(" cert binary is " + str(len(cert_binary)) + " bytes\n") 1707 cipher = self.sslconn.cipher() 1708 if support.verbose and self.server.chatty: 1709 sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n") 1710 sys.stdout.write(" server: selected protocol is now " 1711 + str(self.sslconn.selected_npn_protocol()) + "\n") 1712 return True 1713 1714 def read(self): 1715 if self.sslconn: 1716 return self.sslconn.read() 1717 else: 1718 return self.sock.recv(1024) 1719 1720 def write(self, bytes): 1721 if self.sslconn: 1722 return self.sslconn.write(bytes) 1723 else: 1724 return self.sock.send(bytes) 1725 1726 def close(self): 1727 if self.sslconn: 1728 self.sslconn.close() 1729 else: 1730 self.sock.close() 1731 1732 def run(self): 1733 self.running = True 1734 if not self.server.starttls_server: 1735 if not self.wrap_conn(): 1736 return 1737 while self.running: 1738 try: 1739 msg = self.read() 1740 stripped = msg.strip() 1741 if not stripped: 1742 # eof, so quit this handler 1743 self.running = False 1744 self.close() 1745 elif stripped == b'over': 1746 if support.verbose and self.server.connectionchatty: 1747 sys.stdout.write(" server: client closed connection\n") 1748 self.close() 1749 return 1750 elif (self.server.starttls_server and 1751 stripped == b'STARTTLS'): 1752 if support.verbose and self.server.connectionchatty: 1753 sys.stdout.write(" server: read STARTTLS from client, sending OK...\n") 1754 self.write(b"OK\n") 1755 if not self.wrap_conn(): 1756 return 1757 elif (self.server.starttls_server and self.sslconn 1758 and stripped == b'ENDTLS'): 1759 if support.verbose and self.server.connectionchatty: 1760 sys.stdout.write(" server: read ENDTLS from client, sending OK...\n") 1761 self.write(b"OK\n") 1762 self.sock = self.sslconn.unwrap() 1763 self.sslconn = None 1764 if support.verbose and self.server.connectionchatty: 1765 sys.stdout.write(" server: connection is now unencrypted...\n") 1766 elif stripped == b'CB tls-unique': 1767 if support.verbose and self.server.connectionchatty: 1768 sys.stdout.write(" server: read CB tls-unique from client, sending our CB data...\n") 1769 data = self.sslconn.get_channel_binding("tls-unique") 1770 self.write(repr(data).encode("us-ascii") + b"\n") 1771 else: 1772 if (support.verbose and 1773 self.server.connectionchatty): 1774 ctype = (self.sslconn and "encrypted") or "unencrypted" 1775 sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n" 1776 % (msg, ctype, msg.lower(), ctype)) 1777 self.write(msg.lower()) 1778 except ssl.SSLError: 1779 if self.server.chatty: 1780 handle_error("Test server failure:\n") 1781 self.close() 1782 self.running = False 1783 # normally, we'd just stop here, but for the test 1784 # harness, we want to stop the server 1785 self.server.stop() 1786 1787 def __init__(self, certificate=None, ssl_version=None, 1788 certreqs=None, cacerts=None, 1789 chatty=True, connectionchatty=False, starttls_server=False, 1790 npn_protocols=None, alpn_protocols=None, 1791 ciphers=None, context=None): 1792 if context: 1793 self.context = context 1794 else: 1795 self.context = ssl.SSLContext(ssl_version 1796 if ssl_version is not None 1797 else ssl.PROTOCOL_TLSv1) 1798 self.context.verify_mode = (certreqs if certreqs is not None 1799 else ssl.CERT_NONE) 1800 if cacerts: 1801 self.context.load_verify_locations(cacerts) 1802 if certificate: 1803 self.context.load_cert_chain(certificate) 1804 if npn_protocols: 1805 self.context.set_npn_protocols(npn_protocols) 1806 if alpn_protocols: 1807 self.context.set_alpn_protocols(alpn_protocols) 1808 if ciphers: 1809 self.context.set_ciphers(ciphers) 1810 self.chatty = chatty 1811 self.connectionchatty = connectionchatty 1812 self.starttls_server = starttls_server 1813 self.sock = socket.socket() 1814 self.port = support.bind_port(self.sock) 1815 self.flag = None 1816 self.active = False 1817 self.selected_npn_protocols = [] 1818 self.selected_alpn_protocols = [] 1819 self.conn_errors = [] 1820 threading.Thread.__init__(self) 1821 self.daemon = True 1822 1823 def __enter__(self): 1824 self.start(threading.Event()) 1825 self.flag.wait() 1826 return self 1827 1828 def __exit__(self, *args): 1829 self.stop() 1830 self.join() 1831 1832 def start(self, flag=None): 1833 self.flag = flag 1834 threading.Thread.start(self) 1835 1836 def run(self): 1837 self.sock.settimeout(0.05) 1838 self.sock.listen(5) 1839 self.active = True 1840 if self.flag: 1841 # signal an event 1842 self.flag.set() 1843 while self.active: 1844 try: 1845 newconn, connaddr = self.sock.accept() 1846 if support.verbose and self.chatty: 1847 sys.stdout.write(' server: new connection from ' 1848 + repr(connaddr) + '\n') 1849 handler = self.ConnectionHandler(self, newconn, connaddr) 1850 handler.start() 1851 handler.join() 1852 except socket.timeout: 1853 pass 1854 except KeyboardInterrupt: 1855 self.stop() 1856 self.sock.close() 1857 1858 def stop(self): 1859 self.active = False 1860 1861 class AsyncoreEchoServer(threading.Thread): 1862 1863 class EchoServer(asyncore.dispatcher): 1864 1865 class ConnectionHandler(asyncore.dispatcher_with_send): 1866 1867 def __init__(self, conn, certfile): 1868 self.socket = ssl.wrap_socket(conn, server_side=True, 1869 certfile=certfile, 1870 do_handshake_on_connect=False) 1871 asyncore.dispatcher_with_send.__init__(self, self.socket) 1872 self._ssl_accepting = True 1873 self._do_ssl_handshake() 1874 1875 def readable(self): 1876 if isinstance(self.socket, ssl.SSLSocket): 1877 while self.socket.pending() > 0: 1878 self.handle_read_event() 1879 return True 1880 1881 def _do_ssl_handshake(self): 1882 try: 1883 self.socket.do_handshake() 1884 except (ssl.SSLWantReadError, ssl.SSLWantWriteError): 1885 return 1886 except ssl.SSLEOFError: 1887 return self.handle_close() 1888 except ssl.SSLError: 1889 raise 1890 except socket.error, err: 1891 if err.args[0] == errno.ECONNABORTED: 1892 return self.handle_close() 1893 else: 1894 self._ssl_accepting = False 1895 1896 def handle_read(self): 1897 if self._ssl_accepting: 1898 self._do_ssl_handshake() 1899 else: 1900 data = self.recv(1024) 1901 if support.verbose: 1902 sys.stdout.write(" server: read %s from client\n" % repr(data)) 1903 if not data: 1904 self.close() 1905 else: 1906 self.send(data.lower()) 1907 1908 def handle_close(self): 1909 self.close() 1910 if support.verbose: 1911 sys.stdout.write(" server: closed connection %s\n" % self.socket) 1912 1913 def handle_error(self): 1914 raise 1915 1916 def __init__(self, certfile): 1917 self.certfile = certfile 1918 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 1919 self.port = support.bind_port(sock, '') 1920 asyncore.dispatcher.__init__(self, sock) 1921 self.listen(5) 1922 1923 def handle_accept(self): 1924 sock_obj, addr = self.accept() 1925 if support.verbose: 1926 sys.stdout.write(" server: new connection from %s:%s\n" %addr) 1927 self.ConnectionHandler(sock_obj, self.certfile) 1928 1929 def handle_error(self): 1930 raise 1931 1932 def __init__(self, certfile): 1933 self.flag = None 1934 self.active = False 1935 self.server = self.EchoServer(certfile) 1936 self.port = self.server.port 1937 threading.Thread.__init__(self) 1938 self.daemon = True 1939 1940 def __str__(self): 1941 return "<%s %s>" % (self.__class__.__name__, self.server) 1942 1943 def __enter__(self): 1944 self.start(threading.Event()) 1945 self.flag.wait() 1946 return self 1947 1948 def __exit__(self, *args): 1949 if support.verbose: 1950 sys.stdout.write(" cleanup: stopping server.\n") 1951 self.stop() 1952 if support.verbose: 1953 sys.stdout.write(" cleanup: joining server thread.\n") 1954 self.join() 1955 if support.verbose: 1956 sys.stdout.write(" cleanup: successfully joined.\n") 1957 1958 def start(self, flag=None): 1959 self.flag = flag 1960 threading.Thread.start(self) 1961 1962 def run(self): 1963 self.active = True 1964 if self.flag: 1965 self.flag.set() 1966 while self.active: 1967 try: 1968 asyncore.loop(1) 1969 except: 1970 pass 1971 1972 def stop(self): 1973 self.active = False 1974 self.server.close() 1975 1976 def server_params_test(client_context, server_context, indata=b"FOO\n", 1977 chatty=True, connectionchatty=False, sni_name=None): 1978 """ 1979 Launch a server, connect a client to it and try various reads 1980 and writes. 1981 """ 1982 stats = {} 1983 server = ThreadedEchoServer(context=server_context, 1984 chatty=chatty, 1985 connectionchatty=False) 1986 with server: 1987 with closing(client_context.wrap_socket(socket.socket(), 1988 server_hostname=sni_name)) as s: 1989 s.connect((HOST, server.port)) 1990 for arg in [indata, bytearray(indata), memoryview(indata)]: 1991 if connectionchatty: 1992 if support.verbose: 1993 sys.stdout.write( 1994 " client: sending %r...\n" % indata) 1995 s.write(arg) 1996 outdata = s.read() 1997 if connectionchatty: 1998 if support.verbose: 1999 sys.stdout.write(" client: read %r\n" % outdata) 2000 if outdata != indata.lower(): 2001 raise AssertionError( 2002 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n" 2003 % (outdata[:20], len(outdata), 2004 indata[:20].lower(), len(indata))) 2005 s.write(b"over\n") 2006 if connectionchatty: 2007 if support.verbose: 2008 sys.stdout.write(" client: closing connection.\n") 2009 stats.update({ 2010 'compression': s.compression(), 2011 'cipher': s.cipher(), 2012 'peercert': s.getpeercert(), 2013 'client_alpn_protocol': s.selected_alpn_protocol(), 2014 'client_npn_protocol': s.selected_npn_protocol(), 2015 'version': s.version(), 2016 }) 2017 s.close() 2018 stats['server_alpn_protocols'] = server.selected_alpn_protocols 2019 stats['server_npn_protocols'] = server.selected_npn_protocols 2020 return stats 2021 2022 def try_protocol_combo(server_protocol, client_protocol, expect_success, 2023 certsreqs=None, server_options=0, client_options=0): 2024 """ 2025 Try to SSL-connect using *client_protocol* to *server_protocol*. 2026 If *expect_success* is true, assert that the connection succeeds, 2027 if it's false, assert that the connection fails. 2028 Also, if *expect_success* is a string, assert that it is the protocol 2029 version actually used by the connection. 2030 """ 2031 if certsreqs is None: 2032 certsreqs = ssl.CERT_NONE 2033 certtype = { 2034 ssl.CERT_NONE: "CERT_NONE", 2035 ssl.CERT_OPTIONAL: "CERT_OPTIONAL", 2036 ssl.CERT_REQUIRED: "CERT_REQUIRED", 2037 }[certsreqs] 2038 if support.verbose: 2039 formatstr = (expect_success and " %s->%s %s\n") or " {%s->%s} %s\n" 2040 sys.stdout.write(formatstr % 2041 (ssl.get_protocol_name(client_protocol), 2042 ssl.get_protocol_name(server_protocol), 2043 certtype)) 2044 client_context = ssl.SSLContext(client_protocol) 2045 client_context.options |= client_options 2046 server_context = ssl.SSLContext(server_protocol) 2047 server_context.options |= server_options 2048 2049 # NOTE: we must enable "ALL" ciphers on the client, otherwise an 2050 # SSLv23 client will send an SSLv3 hello (rather than SSLv2) 2051 # starting from OpenSSL 1.0.0 (see issue #8322). 2052 if client_context.protocol == ssl.PROTOCOL_SSLv23: 2053 client_context.set_ciphers("ALL") 2054 2055 for ctx in (client_context, server_context): 2056 ctx.verify_mode = certsreqs 2057 ctx.load_cert_chain(CERTFILE) 2058 ctx.load_verify_locations(CERTFILE) 2059 try: 2060 stats = server_params_test(client_context, server_context, 2061 chatty=False, connectionchatty=False) 2062 # Protocol mismatch can result in either an SSLError, or a 2063 # "Connection reset by peer" error. 2064 except ssl.SSLError: 2065 if expect_success: 2066 raise 2067 except socket.error as e: 2068 if expect_success or e.errno != errno.ECONNRESET: 2069 raise 2070 else: 2071 if not expect_success: 2072 raise AssertionError( 2073 "Client protocol %s succeeded with server protocol %s!" 2074 % (ssl.get_protocol_name(client_protocol), 2075 ssl.get_protocol_name(server_protocol))) 2076 elif (expect_success is not True 2077 and expect_success != stats['version']): 2078 raise AssertionError("version mismatch: expected %r, got %r" 2079 % (expect_success, stats['version'])) 2080 2081 2082 class ThreadedTests(unittest.TestCase): 2083 2084 @skip_if_broken_ubuntu_ssl 2085 def test_echo(self): 2086 """Basic test of an SSL client connecting to a server""" 2087 if support.verbose: 2088 sys.stdout.write("\n") 2089 for protocol in PROTOCOLS: 2090 context = ssl.SSLContext(protocol) 2091 context.load_cert_chain(CERTFILE) 2092 server_params_test(context, context, 2093 chatty=True, connectionchatty=True) 2094 2095 def test_getpeercert(self): 2096 if support.verbose: 2097 sys.stdout.write("\n") 2098 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 2099 context.verify_mode = ssl.CERT_REQUIRED 2100 context.load_verify_locations(CERTFILE) 2101 context.load_cert_chain(CERTFILE) 2102 server = ThreadedEchoServer(context=context, chatty=False) 2103 with server: 2104 s = context.wrap_socket(socket.socket(), 2105 do_handshake_on_connect=False) 2106 s.connect((HOST, server.port)) 2107 # getpeercert() raise ValueError while the handshake isn't 2108 # done. 2109 with self.assertRaises(ValueError): 2110 s.getpeercert() 2111 s.do_handshake() 2112 cert = s.getpeercert() 2113 self.assertTrue(cert, "Can't get peer certificate.") 2114 cipher = s.cipher() 2115 if support.verbose: 2116 sys.stdout.write(pprint.pformat(cert) + '\n') 2117 sys.stdout.write("Connection cipher is " + str(cipher) + '.\n') 2118 if 'subject' not in cert: 2119 self.fail("No subject field in certificate: %s." % 2120 pprint.pformat(cert)) 2121 if ((('organizationName', 'Python Software Foundation'),) 2122 not in cert['subject']): 2123 self.fail( 2124 "Missing or invalid 'organizationName' field in certificate subject; " 2125 "should be 'Python Software Foundation'.") 2126 self.assertIn('notBefore', cert) 2127 self.assertIn('notAfter', cert) 2128 before = ssl.cert_time_to_seconds(cert['notBefore']) 2129 after = ssl.cert_time_to_seconds(cert['notAfter']) 2130 self.assertLess(before, after) 2131 s.close() 2132 2133 @unittest.skipUnless(have_verify_flags(), 2134 "verify_flags need OpenSSL > 0.9.8") 2135 def test_crl_check(self): 2136 if support.verbose: 2137 sys.stdout.write("\n") 2138 2139 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2140 server_context.load_cert_chain(SIGNED_CERTFILE) 2141 2142 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2143 context.verify_mode = ssl.CERT_REQUIRED 2144 context.load_verify_locations(SIGNING_CA) 2145 tf = getattr(ssl, "VERIFY_X509_TRUSTED_FIRST", 0) 2146 self.assertEqual(context.verify_flags, ssl.VERIFY_DEFAULT | tf) 2147 2148 # VERIFY_DEFAULT should pass 2149 server = ThreadedEchoServer(context=server_context, chatty=True) 2150 with server: 2151 with closing(context.wrap_socket(socket.socket())) as s: 2152 s.connect((HOST, server.port)) 2153 cert = s.getpeercert() 2154 self.assertTrue(cert, "Can't get peer certificate.") 2155 2156 # VERIFY_CRL_CHECK_LEAF without a loaded CRL file fails 2157 context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF 2158 2159 server = ThreadedEchoServer(context=server_context, chatty=True) 2160 with server: 2161 with closing(context.wrap_socket(socket.socket())) as s: 2162 with self.assertRaisesRegexp(ssl.SSLError, 2163 "certificate verify failed"): 2164 s.connect((HOST, server.port)) 2165 2166 # now load a CRL file. The CRL file is signed by the CA. 2167 context.load_verify_locations(CRLFILE) 2168 2169 server = ThreadedEchoServer(context=server_context, chatty=True) 2170 with server: 2171 with closing(context.wrap_socket(socket.socket())) as s: 2172 s.connect((HOST, server.port)) 2173 cert = s.getpeercert() 2174 self.assertTrue(cert, "Can't get peer certificate.") 2175 2176 def test_check_hostname(self): 2177 if support.verbose: 2178 sys.stdout.write("\n") 2179 2180 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2181 server_context.load_cert_chain(SIGNED_CERTFILE) 2182 2183 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2184 context.verify_mode = ssl.CERT_REQUIRED 2185 context.check_hostname = True 2186 context.load_verify_locations(SIGNING_CA) 2187 2188 # correct hostname should verify 2189 server = ThreadedEchoServer(context=server_context, chatty=True) 2190 with server: 2191 with closing(context.wrap_socket(socket.socket(), 2192 server_hostname="localhost")) as s: 2193 s.connect((HOST, server.port)) 2194 cert = s.getpeercert() 2195 self.assertTrue(cert, "Can't get peer certificate.") 2196 2197 # incorrect hostname should raise an exception 2198 server = ThreadedEchoServer(context=server_context, chatty=True) 2199 with server: 2200 with closing(context.wrap_socket(socket.socket(), 2201 server_hostname="invalid")) as s: 2202 with self.assertRaisesRegexp(ssl.CertificateError, 2203 "hostname 'invalid' doesn't match u?'localhost'"): 2204 s.connect((HOST, server.port)) 2205 2206 # missing server_hostname arg should cause an exception, too 2207 server = ThreadedEchoServer(context=server_context, chatty=True) 2208 with server: 2209 with closing(socket.socket()) as s: 2210 with self.assertRaisesRegexp(ValueError, 2211 "check_hostname requires server_hostname"): 2212 context.wrap_socket(s) 2213 2214 def test_wrong_cert(self): 2215 """Connecting when the server rejects the client's certificate 2216 2217 Launch a server with CERT_REQUIRED, and check that trying to 2218 connect to it with a wrong client certificate fails. 2219 """ 2220 certfile = os.path.join(os.path.dirname(__file__) or os.curdir, 2221 "wrongcert.pem") 2222 server = ThreadedEchoServer(CERTFILE, 2223 certreqs=ssl.CERT_REQUIRED, 2224 cacerts=CERTFILE, chatty=False, 2225 connectionchatty=False) 2226 with server, \ 2227 closing(socket.socket()) as sock, \ 2228 closing(ssl.wrap_socket(sock, 2229 certfile=certfile, 2230 ssl_version=ssl.PROTOCOL_TLSv1)) as s: 2231 try: 2232 # Expect either an SSL error about the server rejecting 2233 # the connection, or a low-level connection reset (which 2234 # sometimes happens on Windows) 2235 s.connect((HOST, server.port)) 2236 except ssl.SSLError as e: 2237 if support.verbose: 2238 sys.stdout.write("\nSSLError is %r\n" % e) 2239 except socket.error as e: 2240 if e.errno != errno.ECONNRESET: 2241 raise 2242 if support.verbose: 2243 sys.stdout.write("\nsocket.error is %r\n" % e) 2244 else: 2245 self.fail("Use of invalid cert should have failed!") 2246 2247 def test_rude_shutdown(self): 2248 """A brutal shutdown of an SSL server should raise an OSError 2249 in the client when attempting handshake. 2250 """ 2251 listener_ready = threading.Event() 2252 listener_gone = threading.Event() 2253 2254 s = socket.socket() 2255 port = support.bind_port(s, HOST) 2256 2257 # `listener` runs in a thread. It sits in an accept() until 2258 # the main thread connects. Then it rudely closes the socket, 2259 # and sets Event `listener_gone` to let the main thread know 2260 # the socket is gone. 2261 def listener(): 2262 s.listen(5) 2263 listener_ready.set() 2264 newsock, addr = s.accept() 2265 newsock.close() 2266 s.close() 2267 listener_gone.set() 2268 2269 def connector(): 2270 listener_ready.wait() 2271 with closing(socket.socket()) as c: 2272 c.connect((HOST, port)) 2273 listener_gone.wait() 2274 try: 2275 ssl_sock = ssl.wrap_socket(c) 2276 except socket.error: 2277 pass 2278 else: 2279 self.fail('connecting to closed SSL socket should have failed') 2280 2281 t = threading.Thread(target=listener) 2282 t.start() 2283 try: 2284 connector() 2285 finally: 2286 t.join() 2287 2288 @skip_if_broken_ubuntu_ssl 2289 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv2'), 2290 "OpenSSL is compiled without SSLv2 support") 2291 def test_protocol_sslv2(self): 2292 """Connecting to an SSLv2 server with various client options""" 2293 if support.verbose: 2294 sys.stdout.write("\n") 2295 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True) 2296 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL) 2297 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED) 2298 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False) 2299 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False) 2300 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False) 2301 # SSLv23 client with specific SSL options 2302 if no_sslv2_implies_sslv3_hello(): 2303 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs 2304 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False, 2305 client_options=ssl.OP_NO_SSLv2) 2306 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False, 2307 client_options=ssl.OP_NO_SSLv3) 2308 try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False, 2309 client_options=ssl.OP_NO_TLSv1) 2310 2311 @skip_if_broken_ubuntu_ssl 2312 def test_protocol_sslv23(self): 2313 """Connecting to an SSLv23 server with various client options""" 2314 if support.verbose: 2315 sys.stdout.write("\n") 2316 if hasattr(ssl, 'PROTOCOL_SSLv2'): 2317 try: 2318 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv2, True) 2319 except socket.error as x: 2320 # this fails on some older versions of OpenSSL (0.9.7l, for instance) 2321 if support.verbose: 2322 sys.stdout.write( 2323 " SSL2 client to SSL23 server test unexpectedly failed:\n %s\n" 2324 % str(x)) 2325 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2326 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False) 2327 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True) 2328 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1') 2329 2330 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2331 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_OPTIONAL) 2332 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_OPTIONAL) 2333 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL) 2334 2335 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2336 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, ssl.CERT_REQUIRED) 2337 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, ssl.CERT_REQUIRED) 2338 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED) 2339 2340 # Server with specific SSL options 2341 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2342 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv3, False, 2343 server_options=ssl.OP_NO_SSLv3) 2344 # Will choose TLSv1 2345 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_SSLv23, True, 2346 server_options=ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3) 2347 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1, False, 2348 server_options=ssl.OP_NO_TLSv1) 2349 2350 2351 @skip_if_broken_ubuntu_ssl 2352 @unittest.skipUnless(hasattr(ssl, 'PROTOCOL_SSLv3'), 2353 "OpenSSL is compiled without SSLv3 support") 2354 def test_protocol_sslv3(self): 2355 """Connecting to an SSLv3 server with various client options""" 2356 if support.verbose: 2357 sys.stdout.write("\n") 2358 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3') 2359 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_OPTIONAL) 2360 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv3, 'SSLv3', ssl.CERT_REQUIRED) 2361 if hasattr(ssl, 'PROTOCOL_SSLv2'): 2362 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv2, False) 2363 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, False, 2364 client_options=ssl.OP_NO_SSLv3) 2365 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_TLSv1, False) 2366 if no_sslv2_implies_sslv3_hello(): 2367 # No SSLv2 => client will use an SSLv3 hello on recent OpenSSLs 2368 try_protocol_combo(ssl.PROTOCOL_SSLv3, ssl.PROTOCOL_SSLv23, 2369 False, client_options=ssl.OP_NO_SSLv2) 2370 2371 @skip_if_broken_ubuntu_ssl 2372 def test_protocol_tlsv1(self): 2373 """Connecting to a TLSv1 server with various client options""" 2374 if support.verbose: 2375 sys.stdout.write("\n") 2376 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1') 2377 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_OPTIONAL) 2378 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1, 'TLSv1', ssl.CERT_REQUIRED) 2379 if hasattr(ssl, 'PROTOCOL_SSLv2'): 2380 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv2, False) 2381 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2382 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv3, False) 2383 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_SSLv23, False, 2384 client_options=ssl.OP_NO_TLSv1) 2385 2386 @skip_if_broken_ubuntu_ssl 2387 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_1"), 2388 "TLS version 1.1 not supported.") 2389 def test_protocol_tlsv1_1(self): 2390 """Connecting to a TLSv1.1 server with various client options. 2391 Testing against older TLS versions.""" 2392 if support.verbose: 2393 sys.stdout.write("\n") 2394 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1') 2395 if hasattr(ssl, 'PROTOCOL_SSLv2'): 2396 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv2, False) 2397 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2398 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv3, False) 2399 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_SSLv23, False, 2400 client_options=ssl.OP_NO_TLSv1_1) 2401 2402 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_1, 'TLSv1.1') 2403 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1, False) 2404 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_1, False) 2405 2406 2407 @skip_if_broken_ubuntu_ssl 2408 @unittest.skipUnless(hasattr(ssl, "PROTOCOL_TLSv1_2"), 2409 "TLS version 1.2 not supported.") 2410 def test_protocol_tlsv1_2(self): 2411 """Connecting to a TLSv1.2 server with various client options. 2412 Testing against older TLS versions.""" 2413 if support.verbose: 2414 sys.stdout.write("\n") 2415 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2', 2416 server_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2, 2417 client_options=ssl.OP_NO_SSLv3|ssl.OP_NO_SSLv2,) 2418 if hasattr(ssl, 'PROTOCOL_SSLv2'): 2419 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv2, False) 2420 if hasattr(ssl, 'PROTOCOL_SSLv3'): 2421 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv3, False) 2422 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_SSLv23, False, 2423 client_options=ssl.OP_NO_TLSv1_2) 2424 2425 try_protocol_combo(ssl.PROTOCOL_SSLv23, ssl.PROTOCOL_TLSv1_2, 'TLSv1.2') 2426 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1, False) 2427 try_protocol_combo(ssl.PROTOCOL_TLSv1, ssl.PROTOCOL_TLSv1_2, False) 2428 try_protocol_combo(ssl.PROTOCOL_TLSv1_2, ssl.PROTOCOL_TLSv1_1, False) 2429 try_protocol_combo(ssl.PROTOCOL_TLSv1_1, ssl.PROTOCOL_TLSv1_2, False) 2430 2431 def test_starttls(self): 2432 """Switching from clear text to encrypted and back again.""" 2433 msgs = (b"msg 1", b"MSG 2", b"STARTTLS", b"MSG 3", b"msg 4", b"ENDTLS", b"msg 5", b"msg 6") 2434 2435 server = ThreadedEchoServer(CERTFILE, 2436 ssl_version=ssl.PROTOCOL_TLSv1, 2437 starttls_server=True, 2438 chatty=True, 2439 connectionchatty=True) 2440 wrapped = False 2441 with server: 2442 s = socket.socket() 2443 s.setblocking(1) 2444 s.connect((HOST, server.port)) 2445 if support.verbose: 2446 sys.stdout.write("\n") 2447 for indata in msgs: 2448 if support.verbose: 2449 sys.stdout.write( 2450 " client: sending %r...\n" % indata) 2451 if wrapped: 2452 conn.write(indata) 2453 outdata = conn.read() 2454 else: 2455 s.send(indata) 2456 outdata = s.recv(1024) 2457 msg = outdata.strip().lower() 2458 if indata == b"STARTTLS" and msg.startswith(b"ok"): 2459 # STARTTLS ok, switch to secure mode 2460 if support.verbose: 2461 sys.stdout.write( 2462 " client: read %r from server, starting TLS...\n" 2463 % msg) 2464 conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1) 2465 wrapped = True 2466 elif indata == b"ENDTLS" and msg.startswith(b"ok"): 2467 # ENDTLS ok, switch back to clear text 2468 if support.verbose: 2469 sys.stdout.write( 2470 " client: read %r from server, ending TLS...\n" 2471 % msg) 2472 s = conn.unwrap() 2473 wrapped = False 2474 else: 2475 if support.verbose: 2476 sys.stdout.write( 2477 " client: read %r from server\n" % msg) 2478 if support.verbose: 2479 sys.stdout.write(" client: closing connection.\n") 2480 if wrapped: 2481 conn.write(b"over\n") 2482 else: 2483 s.send(b"over\n") 2484 if wrapped: 2485 conn.close() 2486 else: 2487 s.close() 2488 2489 def test_socketserver(self): 2490 """Using a SocketServer to create and manage SSL connections.""" 2491 server = make_https_server(self, certfile=CERTFILE) 2492 # try to connect 2493 if support.verbose: 2494 sys.stdout.write('\n') 2495 with open(CERTFILE, 'rb') as f: 2496 d1 = f.read() 2497 d2 = '' 2498 # now fetch the same data from the HTTPS server 2499 url = 'https://localhost:%d/%s' % ( 2500 server.port, os.path.split(CERTFILE)[1]) 2501 context = ssl.create_default_context(cafile=CERTFILE) 2502 f = urllib2.urlopen(url, context=context) 2503 try: 2504 dlen = f.info().getheader("content-length") 2505 if dlen and (int(dlen) > 0): 2506 d2 = f.read(int(dlen)) 2507 if support.verbose: 2508 sys.stdout.write( 2509 " client: read %d bytes from remote server '%s'\n" 2510 % (len(d2), server)) 2511 finally: 2512 f.close() 2513 self.assertEqual(d1, d2) 2514 2515 def test_asyncore_server(self): 2516 """Check the example asyncore integration.""" 2517 if support.verbose: 2518 sys.stdout.write("\n") 2519 2520 indata = b"FOO\n" 2521 server = AsyncoreEchoServer(CERTFILE) 2522 with server: 2523 s = ssl.wrap_socket(socket.socket()) 2524 s.connect(('127.0.0.1', server.port)) 2525 if support.verbose: 2526 sys.stdout.write( 2527 " client: sending %r...\n" % indata) 2528 s.write(indata) 2529 outdata = s.read() 2530 if support.verbose: 2531 sys.stdout.write(" client: read %r\n" % outdata) 2532 if outdata != indata.lower(): 2533 self.fail( 2534 "bad data <<%r>> (%d) received; expected <<%r>> (%d)\n" 2535 % (outdata[:20], len(outdata), 2536 indata[:20].lower(), len(indata))) 2537 s.write(b"over\n") 2538 if support.verbose: 2539 sys.stdout.write(" client: closing connection.\n") 2540 s.close() 2541 if support.verbose: 2542 sys.stdout.write(" client: connection closed.\n") 2543 2544 def test_recv_send(self): 2545 """Test recv(), send() and friends.""" 2546 if support.verbose: 2547 sys.stdout.write("\n") 2548 2549 server = ThreadedEchoServer(CERTFILE, 2550 certreqs=ssl.CERT_NONE, 2551 ssl_version=ssl.PROTOCOL_TLSv1, 2552 cacerts=CERTFILE, 2553 chatty=True, 2554 connectionchatty=False) 2555 with server: 2556 s = ssl.wrap_socket(socket.socket(), 2557 server_side=False, 2558 certfile=CERTFILE, 2559 ca_certs=CERTFILE, 2560 cert_reqs=ssl.CERT_NONE, 2561 ssl_version=ssl.PROTOCOL_TLSv1) 2562 s.connect((HOST, server.port)) 2563 # helper methods for standardising recv* method signatures 2564 def _recv_into(): 2565 b = bytearray(b"\0"*100) 2566 count = s.recv_into(b) 2567 return b[:count] 2568 2569 def _recvfrom_into(): 2570 b = bytearray(b"\0"*100) 2571 count, addr = s.recvfrom_into(b) 2572 return b[:count] 2573 2574 # (name, method, whether to expect success, *args) 2575 send_methods = [ 2576 ('send', s.send, True, []), 2577 ('sendto', s.sendto, False, ["some.address"]), 2578 ('sendall', s.sendall, True, []), 2579 ] 2580 recv_methods = [ 2581 ('recv', s.recv, True, []), 2582 ('recvfrom', s.recvfrom, False, ["some.address"]), 2583 ('recv_into', _recv_into, True, []), 2584 ('recvfrom_into', _recvfrom_into, False, []), 2585 ] 2586 data_prefix = u"PREFIX_" 2587 2588 for meth_name, send_meth, expect_success, args in send_methods: 2589 indata = (data_prefix + meth_name).encode('ascii') 2590 try: 2591 send_meth(indata, *args) 2592 outdata = s.read() 2593 if outdata != indata.lower(): 2594 self.fail( 2595 "While sending with <<{name:s}>> bad data " 2596 "<<{outdata:r}>> ({nout:d}) received; " 2597 "expected <<{indata:r}>> ({nin:d})\n".format( 2598 name=meth_name, outdata=outdata[:20], 2599 nout=len(outdata), 2600 indata=indata[:20], nin=len(indata) 2601 ) 2602 ) 2603 except ValueError as e: 2604 if expect_success: 2605 self.fail( 2606 "Failed to send with method <<{name:s}>>; " 2607 "expected to succeed.\n".format(name=meth_name) 2608 ) 2609 if not str(e).startswith(meth_name): 2610 self.fail( 2611 "Method <<{name:s}>> failed with unexpected " 2612 "exception message: {exp:s}\n".format( 2613 name=meth_name, exp=e 2614 ) 2615 ) 2616 2617 for meth_name, recv_meth, expect_success, args in recv_methods: 2618 indata = (data_prefix + meth_name).encode('ascii') 2619 try: 2620 s.send(indata) 2621 outdata = recv_meth(*args) 2622 if outdata != indata.lower(): 2623 self.fail( 2624 "While receiving with <<{name:s}>> bad data " 2625 "<<{outdata:r}>> ({nout:d}) received; " 2626 "expected <<{indata:r}>> ({nin:d})\n".format( 2627 name=meth_name, outdata=outdata[:20], 2628 nout=len(outdata), 2629 indata=indata[:20], nin=len(indata) 2630 ) 2631 ) 2632 except ValueError as e: 2633 if expect_success: 2634 self.fail( 2635 "Failed to receive with method <<{name:s}>>; " 2636 "expected to succeed.\n".format(name=meth_name) 2637 ) 2638 if not str(e).startswith(meth_name): 2639 self.fail( 2640 "Method <<{name:s}>> failed with unexpected " 2641 "exception message: {exp:s}\n".format( 2642 name=meth_name, exp=e 2643 ) 2644 ) 2645 # consume data 2646 s.read() 2647 2648 # read(-1, buffer) is supported, even though read(-1) is not 2649 data = b"data" 2650 s.send(data) 2651 buffer = bytearray(len(data)) 2652 self.assertEqual(s.read(-1, buffer), len(data)) 2653 self.assertEqual(buffer, data) 2654 2655 s.write(b"over\n") 2656 2657 self.assertRaises(ValueError, s.recv, -1) 2658 self.assertRaises(ValueError, s.read, -1) 2659 2660 s.close() 2661 2662 def test_recv_zero(self): 2663 server = ThreadedEchoServer(CERTFILE) 2664 server.__enter__() 2665 self.addCleanup(server.__exit__, None, None) 2666 s = socket.create_connection((HOST, server.port)) 2667 self.addCleanup(s.close) 2668 s = ssl.wrap_socket(s, suppress_ragged_eofs=False) 2669 self.addCleanup(s.close) 2670 2671 # recv/read(0) should return no data 2672 s.send(b"data") 2673 self.assertEqual(s.recv(0), b"") 2674 self.assertEqual(s.read(0), b"") 2675 self.assertEqual(s.read(), b"data") 2676 2677 # Should not block if the other end sends no data 2678 s.setblocking(False) 2679 self.assertEqual(s.recv(0), b"") 2680 self.assertEqual(s.recv_into(bytearray()), 0) 2681 2682 def test_handshake_timeout(self): 2683 # Issue #5103: SSL handshake must respect the socket timeout 2684 server = socket.socket(socket.AF_INET) 2685 host = "127.0.0.1" 2686 port = support.bind_port(server) 2687 started = threading.Event() 2688 finish = False 2689 2690 def serve(): 2691 server.listen(5) 2692 started.set() 2693 conns = [] 2694 while not finish: 2695 r, w, e = select.select([server], [], [], 0.1) 2696 if server in r: 2697 # Let the socket hang around rather than having 2698 # it closed by garbage collection. 2699 conns.append(server.accept()[0]) 2700 for sock in conns: 2701 sock.close() 2702 2703 t = threading.Thread(target=serve) 2704 t.start() 2705 started.wait() 2706 2707 try: 2708 try: 2709 c = socket.socket(socket.AF_INET) 2710 c.settimeout(0.2) 2711 c.connect((host, port)) 2712 # Will attempt handshake and time out 2713 self.assertRaisesRegexp(ssl.SSLError, "timed out", 2714 ssl.wrap_socket, c) 2715 finally: 2716 c.close() 2717 try: 2718 c = socket.socket(socket.AF_INET) 2719 c = ssl.wrap_socket(c) 2720 c.settimeout(0.2) 2721 # Will attempt handshake and time out 2722 self.assertRaisesRegexp(ssl.SSLError, "timed out", 2723 c.connect, (host, port)) 2724 finally: 2725 c.close() 2726 finally: 2727 finish = True 2728 t.join() 2729 server.close() 2730 2731 def test_server_accept(self): 2732 # Issue #16357: accept() on a SSLSocket created through 2733 # SSLContext.wrap_socket(). 2734 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 2735 context.verify_mode = ssl.CERT_REQUIRED 2736 context.load_verify_locations(CERTFILE) 2737 context.load_cert_chain(CERTFILE) 2738 server = socket.socket(socket.AF_INET) 2739 host = "127.0.0.1" 2740 port = support.bind_port(server) 2741 server = context.wrap_socket(server, server_side=True) 2742 2743 evt = threading.Event() 2744 remote = [None] 2745 peer = [None] 2746 def serve(): 2747 server.listen(5) 2748 # Block on the accept and wait on the connection to close. 2749 evt.set() 2750 remote[0], peer[0] = server.accept() 2751 remote[0].recv(1) 2752 2753 t = threading.Thread(target=serve) 2754 t.start() 2755 # Client wait until server setup and perform a connect. 2756 evt.wait() 2757 client = context.wrap_socket(socket.socket()) 2758 client.connect((host, port)) 2759 client_addr = client.getsockname() 2760 client.close() 2761 t.join() 2762 remote[0].close() 2763 server.close() 2764 # Sanity checks. 2765 self.assertIsInstance(remote[0], ssl.SSLSocket) 2766 self.assertEqual(peer[0], client_addr) 2767 2768 def test_getpeercert_enotconn(self): 2769 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 2770 with closing(context.wrap_socket(socket.socket())) as sock: 2771 with self.assertRaises(socket.error) as cm: 2772 sock.getpeercert() 2773 self.assertEqual(cm.exception.errno, errno.ENOTCONN) 2774 2775 def test_do_handshake_enotconn(self): 2776 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 2777 with closing(context.wrap_socket(socket.socket())) as sock: 2778 with self.assertRaises(socket.error) as cm: 2779 sock.do_handshake() 2780 self.assertEqual(cm.exception.errno, errno.ENOTCONN) 2781 2782 def test_default_ciphers(self): 2783 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 2784 try: 2785 # Force a set of weak ciphers on our client context 2786 context.set_ciphers("DES") 2787 except ssl.SSLError: 2788 self.skipTest("no DES cipher available") 2789 with ThreadedEchoServer(CERTFILE, 2790 ssl_version=ssl.PROTOCOL_SSLv23, 2791 chatty=False) as server: 2792 with closing(context.wrap_socket(socket.socket())) as s: 2793 with self.assertRaises(ssl.SSLError): 2794 s.connect((HOST, server.port)) 2795 self.assertIn("no shared cipher", str(server.conn_errors[0])) 2796 2797 def test_version_basic(self): 2798 """ 2799 Basic tests for SSLSocket.version(). 2800 More tests are done in the test_protocol_*() methods. 2801 """ 2802 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2803 with ThreadedEchoServer(CERTFILE, 2804 ssl_version=ssl.PROTOCOL_TLSv1, 2805 chatty=False) as server: 2806 with closing(context.wrap_socket(socket.socket())) as s: 2807 self.assertIs(s.version(), None) 2808 s.connect((HOST, server.port)) 2809 self.assertEqual(s.version(), 'TLSv1') 2810 self.assertIs(s.version(), None) 2811 2812 @unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL") 2813 def test_default_ecdh_curve(self): 2814 # Issue #21015: elliptic curve-based Diffie Hellman key exchange 2815 # should be enabled by default on SSL contexts. 2816 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 2817 context.load_cert_chain(CERTFILE) 2818 # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled 2819 # explicitly using the 'ECCdraft' cipher alias. Otherwise, 2820 # our default cipher list should prefer ECDH-based ciphers 2821 # automatically. 2822 if ssl.OPENSSL_VERSION_INFO < (1, 0, 0): 2823 context.set_ciphers("ECCdraft:ECDH") 2824 with ThreadedEchoServer(context=context) as server: 2825 with closing(context.wrap_socket(socket.socket())) as s: 2826 s.connect((HOST, server.port)) 2827 self.assertIn("ECDH", s.cipher()[0]) 2828 2829 @unittest.skipUnless("tls-unique" in ssl.CHANNEL_BINDING_TYPES, 2830 "'tls-unique' channel binding not available") 2831 def test_tls_unique_channel_binding(self): 2832 """Test tls-unique channel binding.""" 2833 if support.verbose: 2834 sys.stdout.write("\n") 2835 2836 server = ThreadedEchoServer(CERTFILE, 2837 certreqs=ssl.CERT_NONE, 2838 ssl_version=ssl.PROTOCOL_TLSv1, 2839 cacerts=CERTFILE, 2840 chatty=True, 2841 connectionchatty=False) 2842 with server: 2843 s = ssl.wrap_socket(socket.socket(), 2844 server_side=False, 2845 certfile=CERTFILE, 2846 ca_certs=CERTFILE, 2847 cert_reqs=ssl.CERT_NONE, 2848 ssl_version=ssl.PROTOCOL_TLSv1) 2849 s.connect((HOST, server.port)) 2850 # get the data 2851 cb_data = s.get_channel_binding("tls-unique") 2852 if support.verbose: 2853 sys.stdout.write(" got channel binding data: {0!r}\n" 2854 .format(cb_data)) 2855 2856 # check if it is sane 2857 self.assertIsNotNone(cb_data) 2858 self.assertEqual(len(cb_data), 12) # True for TLSv1 2859 2860 # and compare with the peers version 2861 s.write(b"CB tls-unique\n") 2862 peer_data_repr = s.read().strip() 2863 self.assertEqual(peer_data_repr, 2864 repr(cb_data).encode("us-ascii")) 2865 s.close() 2866 2867 # now, again 2868 s = ssl.wrap_socket(socket.socket(), 2869 server_side=False, 2870 certfile=CERTFILE, 2871 ca_certs=CERTFILE, 2872 cert_reqs=ssl.CERT_NONE, 2873 ssl_version=ssl.PROTOCOL_TLSv1) 2874 s.connect((HOST, server.port)) 2875 new_cb_data = s.get_channel_binding("tls-unique") 2876 if support.verbose: 2877 sys.stdout.write(" got another channel binding data: {0!r}\n" 2878 .format(new_cb_data)) 2879 # is it really unique 2880 self.assertNotEqual(cb_data, new_cb_data) 2881 self.assertIsNotNone(cb_data) 2882 self.assertEqual(len(cb_data), 12) # True for TLSv1 2883 s.write(b"CB tls-unique\n") 2884 peer_data_repr = s.read().strip() 2885 self.assertEqual(peer_data_repr, 2886 repr(new_cb_data).encode("us-ascii")) 2887 s.close() 2888 2889 def test_compression(self): 2890 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2891 context.load_cert_chain(CERTFILE) 2892 stats = server_params_test(context, context, 2893 chatty=True, connectionchatty=True) 2894 if support.verbose: 2895 sys.stdout.write(" got compression: {!r}\n".format(stats['compression'])) 2896 self.assertIn(stats['compression'], { None, 'ZLIB', 'RLE' }) 2897 2898 @unittest.skipUnless(hasattr(ssl, 'OP_NO_COMPRESSION'), 2899 "ssl.OP_NO_COMPRESSION needed for this test") 2900 def test_compression_disabled(self): 2901 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2902 context.load_cert_chain(CERTFILE) 2903 context.options |= ssl.OP_NO_COMPRESSION 2904 stats = server_params_test(context, context, 2905 chatty=True, connectionchatty=True) 2906 self.assertIs(stats['compression'], None) 2907 2908 def test_dh_params(self): 2909 # Check we can get a connection with ephemeral Diffie-Hellman 2910 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2911 context.load_cert_chain(CERTFILE) 2912 context.load_dh_params(DHFILE) 2913 context.set_ciphers("kEDH") 2914 stats = server_params_test(context, context, 2915 chatty=True, connectionchatty=True) 2916 cipher = stats["cipher"][0] 2917 parts = cipher.split("-") 2918 if "ADH" not in parts and "EDH" not in parts and "DHE" not in parts: 2919 self.fail("Non-DH cipher: " + cipher[0]) 2920 2921 def test_selected_alpn_protocol(self): 2922 # selected_alpn_protocol() is None unless ALPN is used. 2923 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2924 context.load_cert_chain(CERTFILE) 2925 stats = server_params_test(context, context, 2926 chatty=True, connectionchatty=True) 2927 self.assertIs(stats['client_alpn_protocol'], None) 2928 2929 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support required") 2930 def test_selected_alpn_protocol_if_server_uses_alpn(self): 2931 # selected_alpn_protocol() is None unless ALPN is used by the client. 2932 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2933 client_context.load_verify_locations(CERTFILE) 2934 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2935 server_context.load_cert_chain(CERTFILE) 2936 server_context.set_alpn_protocols(['foo', 'bar']) 2937 stats = server_params_test(client_context, server_context, 2938 chatty=True, connectionchatty=True) 2939 self.assertIs(stats['client_alpn_protocol'], None) 2940 2941 @unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test") 2942 def test_alpn_protocols(self): 2943 server_protocols = ['foo', 'bar', 'milkshake'] 2944 protocol_tests = [ 2945 (['foo', 'bar'], 'foo'), 2946 (['bar', 'foo'], 'foo'), 2947 (['milkshake'], 'milkshake'), 2948 (['http/3.0', 'http/4.0'], None) 2949 ] 2950 for client_protocols, expected in protocol_tests: 2951 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) 2952 server_context.load_cert_chain(CERTFILE) 2953 server_context.set_alpn_protocols(server_protocols) 2954 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) 2955 client_context.load_cert_chain(CERTFILE) 2956 client_context.set_alpn_protocols(client_protocols) 2957 2958 try: 2959 stats = server_params_test(client_context, 2960 server_context, 2961 chatty=True, 2962 connectionchatty=True) 2963 except ssl.SSLError as e: 2964 stats = e 2965 2966 if expected is None and IS_OPENSSL_1_1: 2967 # OpenSSL 1.1.0 raises handshake error 2968 self.assertIsInstance(stats, ssl.SSLError) 2969 else: 2970 msg = "failed trying %s (s) and %s (c).\n" \ 2971 "was expecting %s, but got %%s from the %%s" \ 2972 % (str(server_protocols), str(client_protocols), 2973 str(expected)) 2974 client_result = stats['client_alpn_protocol'] 2975 self.assertEqual(client_result, expected, 2976 msg % (client_result, "client")) 2977 server_result = stats['server_alpn_protocols'][-1] \ 2978 if len(stats['server_alpn_protocols']) else 'nothing' 2979 self.assertEqual(server_result, expected, 2980 msg % (server_result, "server")) 2981 2982 def test_selected_npn_protocol(self): 2983 # selected_npn_protocol() is None unless NPN is used 2984 context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 2985 context.load_cert_chain(CERTFILE) 2986 stats = server_params_test(context, context, 2987 chatty=True, connectionchatty=True) 2988 self.assertIs(stats['client_npn_protocol'], None) 2989 2990 @unittest.skipUnless(ssl.HAS_NPN, "NPN support needed for this test") 2991 def test_npn_protocols(self): 2992 server_protocols = ['http/1.1', 'spdy/2'] 2993 protocol_tests = [ 2994 (['http/1.1', 'spdy/2'], 'http/1.1'), 2995 (['spdy/2', 'http/1.1'], 'http/1.1'), 2996 (['spdy/2', 'test'], 'spdy/2'), 2997 (['abc', 'def'], 'abc') 2998 ] 2999 for client_protocols, expected in protocol_tests: 3000 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 3001 server_context.load_cert_chain(CERTFILE) 3002 server_context.set_npn_protocols(server_protocols) 3003 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 3004 client_context.load_cert_chain(CERTFILE) 3005 client_context.set_npn_protocols(client_protocols) 3006 stats = server_params_test(client_context, server_context, 3007 chatty=True, connectionchatty=True) 3008 3009 msg = "failed trying %s (s) and %s (c).\n" \ 3010 "was expecting %s, but got %%s from the %%s" \ 3011 % (str(server_protocols), str(client_protocols), 3012 str(expected)) 3013 client_result = stats['client_npn_protocol'] 3014 self.assertEqual(client_result, expected, msg % (client_result, "client")) 3015 server_result = stats['server_npn_protocols'][-1] \ 3016 if len(stats['server_npn_protocols']) else 'nothing' 3017 self.assertEqual(server_result, expected, msg % (server_result, "server")) 3018 3019 def sni_contexts(self): 3020 server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 3021 server_context.load_cert_chain(SIGNED_CERTFILE) 3022 other_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 3023 other_context.load_cert_chain(SIGNED_CERTFILE2) 3024 client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1) 3025 client_context.verify_mode = ssl.CERT_REQUIRED 3026 client_context.load_verify_locations(SIGNING_CA) 3027 return server_context, other_context, client_context 3028 3029 def check_common_name(self, stats, name): 3030 cert = stats['peercert'] 3031 self.assertIn((('commonName', name),), cert['subject']) 3032 3033 @needs_sni 3034 def test_sni_callback(self): 3035 calls = [] 3036 server_context, other_context, client_context = self.sni_contexts() 3037 3038 def servername_cb(ssl_sock, server_name, initial_context): 3039 calls.append((server_name, initial_context)) 3040 if server_name is not None: 3041 ssl_sock.context = other_context 3042 server_context.set_servername_callback(servername_cb) 3043 3044 stats = server_params_test(client_context, server_context, 3045 chatty=True, 3046 sni_name='supermessage') 3047 # The hostname was fetched properly, and the certificate was 3048 # changed for the connection. 3049 self.assertEqual(calls, [("supermessage", server_context)]) 3050 # CERTFILE4 was selected 3051 self.check_common_name(stats, 'fakehostname') 3052 3053 calls = [] 3054 # The callback is called with server_name=None 3055 stats = server_params_test(client_context, server_context, 3056 chatty=True, 3057 sni_name=None) 3058 self.assertEqual(calls, [(None, server_context)]) 3059 self.check_common_name(stats, 'localhost') 3060 3061 # Check disabling the callback 3062 calls = [] 3063 server_context.set_servername_callback(None) 3064 3065 stats = server_params_test(client_context, server_context, 3066 chatty=True, 3067 sni_name='notfunny') 3068 # Certificate didn't change 3069 self.check_common_name(stats, 'localhost') 3070 self.assertEqual(calls, []) 3071 3072 @needs_sni 3073 def test_sni_callback_alert(self): 3074 # Returning a TLS alert is reflected to the connecting client 3075 server_context, other_context, client_context = self.sni_contexts() 3076 3077 def cb_returning_alert(ssl_sock, server_name, initial_context): 3078 return ssl.ALERT_DESCRIPTION_ACCESS_DENIED 3079 server_context.set_servername_callback(cb_returning_alert) 3080 3081 with self.assertRaises(ssl.SSLError) as cm: 3082 stats = server_params_test(client_context, server_context, 3083 chatty=False, 3084 sni_name='supermessage') 3085 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_ACCESS_DENIED') 3086 3087 @needs_sni 3088 def test_sni_callback_raising(self): 3089 # Raising fails the connection with a TLS handshake failure alert. 3090 server_context, other_context, client_context = self.sni_contexts() 3091 3092 def cb_raising(ssl_sock, server_name, initial_context): 3093 1.0/0.0 3094 server_context.set_servername_callback(cb_raising) 3095 3096 with self.assertRaises(ssl.SSLError) as cm, \ 3097 support.captured_stderr() as stderr: 3098 stats = server_params_test(client_context, server_context, 3099 chatty=False, 3100 sni_name='supermessage') 3101 self.assertEqual(cm.exception.reason, 'SSLV3_ALERT_HANDSHAKE_FAILURE') 3102 self.assertIn("ZeroDivisionError", stderr.getvalue()) 3103 3104 @needs_sni 3105 def test_sni_callback_wrong_return_type(self): 3106 # Returning the wrong return type terminates the TLS connection 3107 # with an internal error alert. 3108 server_context, other_context, client_context = self.sni_contexts() 3109 3110 def cb_wrong_return_type(ssl_sock, server_name, initial_context): 3111 return "foo" 3112 server_context.set_servername_callback(cb_wrong_return_type) 3113 3114 with self.assertRaises(ssl.SSLError) as cm, \ 3115 support.captured_stderr() as stderr: 3116 stats = server_params_test(client_context, server_context, 3117 chatty=False, 3118 sni_name='supermessage') 3119 self.assertEqual(cm.exception.reason, 'TLSV1_ALERT_INTERNAL_ERROR') 3120 self.assertIn("TypeError", stderr.getvalue()) 3121 3122 def test_read_write_after_close_raises_valuerror(self): 3123 context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) 3124 context.verify_mode = ssl.CERT_REQUIRED 3125 context.load_verify_locations(CERTFILE) 3126 context.load_cert_chain(CERTFILE) 3127 server = ThreadedEchoServer(context=context, chatty=False) 3128 3129 with server: 3130 s = context.wrap_socket(socket.socket()) 3131 s.connect((HOST, server.port)) 3132 s.close() 3133 3134 self.assertRaises(ValueError, s.read, 1024) 3135 self.assertRaises(ValueError, s.write, b'hello') 3136 3137 3138def test_main(verbose=False): 3139 if support.verbose: 3140 plats = { 3141 'Linux': platform.linux_distribution, 3142 'Mac': platform.mac_ver, 3143 'Windows': platform.win32_ver, 3144 } 3145 for name, func in plats.items(): 3146 plat = func() 3147 if plat and plat[0]: 3148 plat = '%s %r' % (name, plat) 3149 break 3150 else: 3151 plat = repr(platform.platform()) 3152 print("test_ssl: testing with %r %r" % 3153 (ssl.OPENSSL_VERSION, ssl.OPENSSL_VERSION_INFO)) 3154 print(" under %s" % plat) 3155 print(" HAS_SNI = %r" % ssl.HAS_SNI) 3156 print(" OP_ALL = 0x%8x" % ssl.OP_ALL) 3157 try: 3158 print(" OP_NO_TLSv1_1 = 0x%8x" % ssl.OP_NO_TLSv1_1) 3159 except AttributeError: 3160 pass 3161 3162 for filename in [ 3163 CERTFILE, REMOTE_ROOT_CERT, BYTES_CERTFILE, 3164 ONLYCERT, ONLYKEY, BYTES_ONLYCERT, BYTES_ONLYKEY, 3165 SIGNED_CERTFILE, SIGNED_CERTFILE2, SIGNING_CA, 3166 BADCERT, BADKEY, EMPTYCERT]: 3167 if not os.path.exists(filename): 3168 raise support.TestFailed("Can't read certificate file %r" % filename) 3169 3170 tests = [ContextTests, BasicTests, BasicSocketTests, SSLErrorTests] 3171 3172 if support.is_resource_enabled('network'): 3173 tests.append(NetworkedTests) 3174 3175 if _have_threads: 3176 thread_info = support.threading_setup() 3177 if thread_info: 3178 tests.append(ThreadedTests) 3179 3180 try: 3181 support.run_unittest(*tests) 3182 finally: 3183 if _have_threads: 3184 support.threading_cleanup(*thread_info) 3185 3186if __name__ == "__main__": 3187 test_main() 3188