1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7import collections 8import contextlib 9import itertools 10import warnings 11from contextlib import contextmanager 12 13import six 14from six.moves import range 15 16from cryptography import utils, x509 17from cryptography.exceptions import UnsupportedAlgorithm, _Reasons 18from cryptography.hazmat._der import ( 19 INTEGER, 20 NULL, 21 SEQUENCE, 22 encode_der, 23 encode_der_integer, 24) 25from cryptography.hazmat.backends.interfaces import ( 26 CMACBackend, 27 CipherBackend, 28 DERSerializationBackend, 29 DHBackend, 30 DSABackend, 31 EllipticCurveBackend, 32 HMACBackend, 33 HashBackend, 34 PBKDF2HMACBackend, 35 PEMSerializationBackend, 36 RSABackend, 37 ScryptBackend, 38 X509Backend, 39) 40from cryptography.hazmat.backends.openssl import aead 41from cryptography.hazmat.backends.openssl.ciphers import _CipherContext 42from cryptography.hazmat.backends.openssl.cmac import _CMACContext 43from cryptography.hazmat.backends.openssl.decode_asn1 import ( 44 _CRL_ENTRY_REASON_ENUM_TO_CODE, 45 _CRL_EXTENSION_HANDLERS, 46 _EXTENSION_HANDLERS_BASE, 47 _EXTENSION_HANDLERS_SCT, 48 _OCSP_BASICRESP_EXTENSION_HANDLERS, 49 _OCSP_REQ_EXTENSION_HANDLERS, 50 _OCSP_SINGLERESP_EXTENSION_HANDLERS_SCT, 51 _REVOKED_EXTENSION_HANDLERS, 52 _X509ExtensionParser, 53) 54from cryptography.hazmat.backends.openssl.dh import ( 55 _DHParameters, 56 _DHPrivateKey, 57 _DHPublicKey, 58 _dh_params_dup, 59) 60from cryptography.hazmat.backends.openssl.dsa import ( 61 _DSAParameters, 62 _DSAPrivateKey, 63 _DSAPublicKey, 64) 65from cryptography.hazmat.backends.openssl.ec import ( 66 _EllipticCurvePrivateKey, 67 _EllipticCurvePublicKey, 68) 69from cryptography.hazmat.backends.openssl.ed25519 import ( 70 _Ed25519PrivateKey, 71 _Ed25519PublicKey, 72) 73from cryptography.hazmat.backends.openssl.ed448 import ( 74 _ED448_KEY_SIZE, 75 _Ed448PrivateKey, 76 _Ed448PublicKey, 77) 78from cryptography.hazmat.backends.openssl.encode_asn1 import ( 79 _CRL_ENTRY_EXTENSION_ENCODE_HANDLERS, 80 _CRL_EXTENSION_ENCODE_HANDLERS, 81 _EXTENSION_ENCODE_HANDLERS, 82 _OCSP_BASICRESP_EXTENSION_ENCODE_HANDLERS, 83 _OCSP_REQUEST_EXTENSION_ENCODE_HANDLERS, 84 _encode_asn1_int_gc, 85 _encode_asn1_str_gc, 86 _encode_name_gc, 87 _txt2obj_gc, 88) 89from cryptography.hazmat.backends.openssl.hashes import _HashContext 90from cryptography.hazmat.backends.openssl.hmac import _HMACContext 91from cryptography.hazmat.backends.openssl.ocsp import ( 92 _OCSPRequest, 93 _OCSPResponse, 94) 95from cryptography.hazmat.backends.openssl.poly1305 import ( 96 _POLY1305_KEY_SIZE, 97 _Poly1305Context, 98) 99from cryptography.hazmat.backends.openssl.rsa import ( 100 _RSAPrivateKey, 101 _RSAPublicKey, 102) 103from cryptography.hazmat.backends.openssl.x25519 import ( 104 _X25519PrivateKey, 105 _X25519PublicKey, 106) 107from cryptography.hazmat.backends.openssl.x448 import ( 108 _X448PrivateKey, 109 _X448PublicKey, 110) 111from cryptography.hazmat.backends.openssl.x509 import ( 112 _Certificate, 113 _CertificateRevocationList, 114 _CertificateSigningRequest, 115 _RevokedCertificate, 116) 117from cryptography.hazmat.bindings.openssl import binding 118from cryptography.hazmat.primitives import hashes, serialization 119from cryptography.hazmat.primitives.asymmetric import ( 120 dh, 121 dsa, 122 ec, 123 ed25519, 124 ed448, 125 rsa, 126) 127from cryptography.hazmat.primitives.asymmetric.padding import ( 128 MGF1, 129 OAEP, 130 PKCS1v15, 131 PSS, 132) 133from cryptography.hazmat.primitives.ciphers.algorithms import ( 134 AES, 135 ARC4, 136 Blowfish, 137 CAST5, 138 Camellia, 139 ChaCha20, 140 IDEA, 141 SEED, 142 TripleDES, 143) 144from cryptography.hazmat.primitives.ciphers.modes import ( 145 CBC, 146 CFB, 147 CFB8, 148 CTR, 149 ECB, 150 GCM, 151 OFB, 152 XTS, 153) 154from cryptography.hazmat.primitives.kdf import scrypt 155from cryptography.hazmat.primitives.serialization import pkcs7, ssh 156from cryptography.x509 import ocsp 157 158 159_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"]) 160 161 162# Not actually supported, just used as a marker for some serialization tests. 163class _RC2(object): 164 pass 165 166 167@utils.register_interface(CipherBackend) 168@utils.register_interface(CMACBackend) 169@utils.register_interface(DERSerializationBackend) 170@utils.register_interface(DHBackend) 171@utils.register_interface(DSABackend) 172@utils.register_interface(EllipticCurveBackend) 173@utils.register_interface(HashBackend) 174@utils.register_interface(HMACBackend) 175@utils.register_interface(PBKDF2HMACBackend) 176@utils.register_interface(RSABackend) 177@utils.register_interface(PEMSerializationBackend) 178@utils.register_interface(X509Backend) 179@utils.register_interface_if( 180 binding.Binding().lib.Cryptography_HAS_SCRYPT, ScryptBackend 181) 182class Backend(object): 183 """ 184 OpenSSL API binding interfaces. 185 """ 186 187 name = "openssl" 188 189 # FIPS has opinions about acceptable algorithms and key sizes, but the 190 # disallowed algorithms are still present in OpenSSL. They just error if 191 # you try to use them. To avoid that we allowlist the algorithms in 192 # FIPS 140-3. This isn't ideal, but FIPS 140-3 is trash so here we are. 193 _fips_aead = { 194 b"aes-128-ccm", 195 b"aes-192-ccm", 196 b"aes-256-ccm", 197 b"aes-128-gcm", 198 b"aes-192-gcm", 199 b"aes-256-gcm", 200 } 201 _fips_ciphers = (AES, TripleDES) 202 _fips_hashes = ( 203 hashes.SHA1, 204 hashes.SHA224, 205 hashes.SHA256, 206 hashes.SHA384, 207 hashes.SHA512, 208 hashes.SHA512_224, 209 hashes.SHA512_256, 210 hashes.SHA3_224, 211 hashes.SHA3_256, 212 hashes.SHA3_384, 213 hashes.SHA3_512, 214 hashes.SHAKE128, 215 hashes.SHAKE256, 216 ) 217 _fips_rsa_min_key_size = 2048 218 _fips_rsa_min_public_exponent = 65537 219 _fips_dsa_min_modulus = 1 << 2048 220 _fips_dh_min_key_size = 2048 221 _fips_dh_min_modulus = 1 << _fips_dh_min_key_size 222 223 def __init__(self): 224 self._binding = binding.Binding() 225 self._ffi = self._binding.ffi 226 self._lib = self._binding.lib 227 self._fips_enabled = self._is_fips_enabled() 228 229 self._cipher_registry = {} 230 self._register_default_ciphers() 231 self._register_x509_ext_parsers() 232 self._register_x509_encoders() 233 if self._fips_enabled and self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE: 234 warnings.warn( 235 "OpenSSL FIPS mode is enabled. Can't enable DRBG fork safety.", 236 UserWarning, 237 ) 238 else: 239 self.activate_osrandom_engine() 240 self._dh_types = [self._lib.EVP_PKEY_DH] 241 if self._lib.Cryptography_HAS_EVP_PKEY_DHX: 242 self._dh_types.append(self._lib.EVP_PKEY_DHX) 243 244 def openssl_assert(self, ok, errors=None): 245 return binding._openssl_assert(self._lib, ok, errors=errors) 246 247 def _is_fips_enabled(self): 248 fips_mode = getattr(self._lib, "FIPS_mode", lambda: 0) 249 mode = fips_mode() 250 if mode == 0: 251 # OpenSSL without FIPS pushes an error on the error stack 252 self._lib.ERR_clear_error() 253 return bool(mode) 254 255 def activate_builtin_random(self): 256 if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE: 257 # Obtain a new structural reference. 258 e = self._lib.ENGINE_get_default_RAND() 259 if e != self._ffi.NULL: 260 self._lib.ENGINE_unregister_RAND(e) 261 # Reset the RNG to use the built-in. 262 res = self._lib.RAND_set_rand_method(self._ffi.NULL) 263 self.openssl_assert(res == 1) 264 # decrement the structural reference from get_default_RAND 265 res = self._lib.ENGINE_finish(e) 266 self.openssl_assert(res == 1) 267 268 @contextlib.contextmanager 269 def _get_osurandom_engine(self): 270 # Fetches an engine by id and returns it. This creates a structural 271 # reference. 272 e = self._lib.ENGINE_by_id(self._lib.Cryptography_osrandom_engine_id) 273 self.openssl_assert(e != self._ffi.NULL) 274 # Initialize the engine for use. This adds a functional reference. 275 res = self._lib.ENGINE_init(e) 276 self.openssl_assert(res == 1) 277 278 try: 279 yield e 280 finally: 281 # Decrement the structural ref incremented by ENGINE_by_id. 282 res = self._lib.ENGINE_free(e) 283 self.openssl_assert(res == 1) 284 # Decrement the functional ref incremented by ENGINE_init. 285 res = self._lib.ENGINE_finish(e) 286 self.openssl_assert(res == 1) 287 288 def activate_osrandom_engine(self): 289 if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE: 290 # Unregister and free the current engine. 291 self.activate_builtin_random() 292 with self._get_osurandom_engine() as e: 293 # Set the engine as the default RAND provider. 294 res = self._lib.ENGINE_set_default_RAND(e) 295 self.openssl_assert(res == 1) 296 # Reset the RNG to use the engine 297 res = self._lib.RAND_set_rand_method(self._ffi.NULL) 298 self.openssl_assert(res == 1) 299 300 def osrandom_engine_implementation(self): 301 buf = self._ffi.new("char[]", 64) 302 with self._get_osurandom_engine() as e: 303 res = self._lib.ENGINE_ctrl_cmd( 304 e, b"get_implementation", len(buf), buf, self._ffi.NULL, 0 305 ) 306 self.openssl_assert(res > 0) 307 return self._ffi.string(buf).decode("ascii") 308 309 def openssl_version_text(self): 310 """ 311 Friendly string name of the loaded OpenSSL library. This is not 312 necessarily the same version as it was compiled against. 313 314 Example: OpenSSL 1.1.1d 10 Sep 2019 315 """ 316 return self._ffi.string( 317 self._lib.OpenSSL_version(self._lib.OPENSSL_VERSION) 318 ).decode("ascii") 319 320 def openssl_version_number(self): 321 return self._lib.OpenSSL_version_num() 322 323 def create_hmac_ctx(self, key, algorithm): 324 return _HMACContext(self, key, algorithm) 325 326 def _evp_md_from_algorithm(self, algorithm): 327 if algorithm.name == "blake2b" or algorithm.name == "blake2s": 328 alg = "{}{}".format( 329 algorithm.name, algorithm.digest_size * 8 330 ).encode("ascii") 331 else: 332 alg = algorithm.name.encode("ascii") 333 334 evp_md = self._lib.EVP_get_digestbyname(alg) 335 return evp_md 336 337 def _evp_md_non_null_from_algorithm(self, algorithm): 338 evp_md = self._evp_md_from_algorithm(algorithm) 339 self.openssl_assert(evp_md != self._ffi.NULL) 340 return evp_md 341 342 def hash_supported(self, algorithm): 343 if self._fips_enabled and not isinstance(algorithm, self._fips_hashes): 344 return False 345 346 evp_md = self._evp_md_from_algorithm(algorithm) 347 return evp_md != self._ffi.NULL 348 349 def hmac_supported(self, algorithm): 350 return self.hash_supported(algorithm) 351 352 def create_hash_ctx(self, algorithm): 353 return _HashContext(self, algorithm) 354 355 def cipher_supported(self, cipher, mode): 356 if self._fips_enabled and not isinstance(cipher, self._fips_ciphers): 357 return False 358 try: 359 adapter = self._cipher_registry[type(cipher), type(mode)] 360 except KeyError: 361 return False 362 evp_cipher = adapter(self, cipher, mode) 363 return self._ffi.NULL != evp_cipher 364 365 def register_cipher_adapter(self, cipher_cls, mode_cls, adapter): 366 if (cipher_cls, mode_cls) in self._cipher_registry: 367 raise ValueError( 368 "Duplicate registration for: {} {}.".format( 369 cipher_cls, mode_cls 370 ) 371 ) 372 self._cipher_registry[cipher_cls, mode_cls] = adapter 373 374 def _register_default_ciphers(self): 375 for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]: 376 self.register_cipher_adapter( 377 AES, 378 mode_cls, 379 GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"), 380 ) 381 for mode_cls in [CBC, CTR, ECB, OFB, CFB]: 382 self.register_cipher_adapter( 383 Camellia, 384 mode_cls, 385 GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"), 386 ) 387 for mode_cls in [CBC, CFB, CFB8, OFB]: 388 self.register_cipher_adapter( 389 TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}") 390 ) 391 self.register_cipher_adapter( 392 TripleDES, ECB, GetCipherByName("des-ede3") 393 ) 394 for mode_cls in [CBC, CFB, OFB, ECB]: 395 self.register_cipher_adapter( 396 Blowfish, mode_cls, GetCipherByName("bf-{mode.name}") 397 ) 398 for mode_cls in [CBC, CFB, OFB, ECB]: 399 self.register_cipher_adapter( 400 SEED, mode_cls, GetCipherByName("seed-{mode.name}") 401 ) 402 for cipher_cls, mode_cls in itertools.product( 403 [CAST5, IDEA], 404 [CBC, OFB, CFB, ECB], 405 ): 406 self.register_cipher_adapter( 407 cipher_cls, 408 mode_cls, 409 GetCipherByName("{cipher.name}-{mode.name}"), 410 ) 411 self.register_cipher_adapter(ARC4, type(None), GetCipherByName("rc4")) 412 # We don't actually support RC2, this is just used by some tests. 413 self.register_cipher_adapter(_RC2, type(None), GetCipherByName("rc2")) 414 self.register_cipher_adapter( 415 ChaCha20, type(None), GetCipherByName("chacha20") 416 ) 417 self.register_cipher_adapter(AES, XTS, _get_xts_cipher) 418 419 def _register_x509_ext_parsers(self): 420 ext_handlers = _EXTENSION_HANDLERS_BASE.copy() 421 # All revoked extensions are valid single response extensions, see: 422 # https://tools.ietf.org/html/rfc6960#section-4.4.5 423 singleresp_handlers = _REVOKED_EXTENSION_HANDLERS.copy() 424 425 if self._lib.Cryptography_HAS_SCT: 426 ext_handlers.update(_EXTENSION_HANDLERS_SCT) 427 singleresp_handlers.update(_OCSP_SINGLERESP_EXTENSION_HANDLERS_SCT) 428 429 self._certificate_extension_parser = _X509ExtensionParser( 430 self, 431 ext_count=self._lib.X509_get_ext_count, 432 get_ext=self._lib.X509_get_ext, 433 handlers=ext_handlers, 434 ) 435 self._csr_extension_parser = _X509ExtensionParser( 436 self, 437 ext_count=self._lib.sk_X509_EXTENSION_num, 438 get_ext=self._lib.sk_X509_EXTENSION_value, 439 handlers=ext_handlers, 440 ) 441 self._revoked_cert_extension_parser = _X509ExtensionParser( 442 self, 443 ext_count=self._lib.X509_REVOKED_get_ext_count, 444 get_ext=self._lib.X509_REVOKED_get_ext, 445 handlers=_REVOKED_EXTENSION_HANDLERS, 446 ) 447 self._crl_extension_parser = _X509ExtensionParser( 448 self, 449 ext_count=self._lib.X509_CRL_get_ext_count, 450 get_ext=self._lib.X509_CRL_get_ext, 451 handlers=_CRL_EXTENSION_HANDLERS, 452 ) 453 self._ocsp_req_ext_parser = _X509ExtensionParser( 454 self, 455 ext_count=self._lib.OCSP_REQUEST_get_ext_count, 456 get_ext=self._lib.OCSP_REQUEST_get_ext, 457 handlers=_OCSP_REQ_EXTENSION_HANDLERS, 458 ) 459 self._ocsp_basicresp_ext_parser = _X509ExtensionParser( 460 self, 461 ext_count=self._lib.OCSP_BASICRESP_get_ext_count, 462 get_ext=self._lib.OCSP_BASICRESP_get_ext, 463 handlers=_OCSP_BASICRESP_EXTENSION_HANDLERS, 464 ) 465 self._ocsp_singleresp_ext_parser = _X509ExtensionParser( 466 self, 467 ext_count=self._lib.OCSP_SINGLERESP_get_ext_count, 468 get_ext=self._lib.OCSP_SINGLERESP_get_ext, 469 handlers=singleresp_handlers, 470 ) 471 472 def _register_x509_encoders(self): 473 self._extension_encode_handlers = _EXTENSION_ENCODE_HANDLERS.copy() 474 self._crl_extension_encode_handlers = ( 475 _CRL_EXTENSION_ENCODE_HANDLERS.copy() 476 ) 477 self._crl_entry_extension_encode_handlers = ( 478 _CRL_ENTRY_EXTENSION_ENCODE_HANDLERS.copy() 479 ) 480 self._ocsp_request_extension_encode_handlers = ( 481 _OCSP_REQUEST_EXTENSION_ENCODE_HANDLERS.copy() 482 ) 483 self._ocsp_basicresp_extension_encode_handlers = ( 484 _OCSP_BASICRESP_EXTENSION_ENCODE_HANDLERS.copy() 485 ) 486 487 def create_symmetric_encryption_ctx(self, cipher, mode): 488 return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT) 489 490 def create_symmetric_decryption_ctx(self, cipher, mode): 491 return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT) 492 493 def pbkdf2_hmac_supported(self, algorithm): 494 return self.hmac_supported(algorithm) 495 496 def derive_pbkdf2_hmac( 497 self, algorithm, length, salt, iterations, key_material 498 ): 499 buf = self._ffi.new("unsigned char[]", length) 500 evp_md = self._evp_md_non_null_from_algorithm(algorithm) 501 key_material_ptr = self._ffi.from_buffer(key_material) 502 res = self._lib.PKCS5_PBKDF2_HMAC( 503 key_material_ptr, 504 len(key_material), 505 salt, 506 len(salt), 507 iterations, 508 evp_md, 509 length, 510 buf, 511 ) 512 self.openssl_assert(res == 1) 513 return self._ffi.buffer(buf)[:] 514 515 def _consume_errors(self): 516 return binding._consume_errors(self._lib) 517 518 def _consume_errors_with_text(self): 519 return binding._consume_errors_with_text(self._lib) 520 521 def _bn_to_int(self, bn): 522 assert bn != self._ffi.NULL 523 524 if not six.PY2: 525 # Python 3 has constant time from_bytes, so use that. 526 bn_num_bytes = self._lib.BN_num_bytes(bn) 527 bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes) 528 bin_len = self._lib.BN_bn2bin(bn, bin_ptr) 529 # A zero length means the BN has value 0 530 self.openssl_assert(bin_len >= 0) 531 val = int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big") 532 if self._lib.BN_is_negative(bn): 533 val = -val 534 return val 535 else: 536 # Under Python 2 the best we can do is hex() 537 hex_cdata = self._lib.BN_bn2hex(bn) 538 self.openssl_assert(hex_cdata != self._ffi.NULL) 539 hex_str = self._ffi.string(hex_cdata) 540 self._lib.OPENSSL_free(hex_cdata) 541 return int(hex_str, 16) 542 543 def _int_to_bn(self, num, bn=None): 544 """ 545 Converts a python integer to a BIGNUM. The returned BIGNUM will not 546 be garbage collected (to support adding them to structs that take 547 ownership of the object). Be sure to register it for GC if it will 548 be discarded after use. 549 """ 550 assert bn is None or bn != self._ffi.NULL 551 552 if bn is None: 553 bn = self._ffi.NULL 554 555 if not six.PY2: 556 # Python 3 has constant time to_bytes, so use that. 557 558 binary = num.to_bytes(int(num.bit_length() / 8.0 + 1), "big") 559 bn_ptr = self._lib.BN_bin2bn(binary, len(binary), bn) 560 self.openssl_assert(bn_ptr != self._ffi.NULL) 561 return bn_ptr 562 563 else: 564 # Under Python 2 the best we can do is hex(), [2:] removes the 0x 565 # prefix. 566 hex_num = hex(num).rstrip("L")[2:].encode("ascii") 567 bn_ptr = self._ffi.new("BIGNUM **") 568 bn_ptr[0] = bn 569 res = self._lib.BN_hex2bn(bn_ptr, hex_num) 570 self.openssl_assert(res != 0) 571 self.openssl_assert(bn_ptr[0] != self._ffi.NULL) 572 return bn_ptr[0] 573 574 def generate_rsa_private_key(self, public_exponent, key_size): 575 rsa._verify_rsa_parameters(public_exponent, key_size) 576 577 rsa_cdata = self._lib.RSA_new() 578 self.openssl_assert(rsa_cdata != self._ffi.NULL) 579 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 580 581 bn = self._int_to_bn(public_exponent) 582 bn = self._ffi.gc(bn, self._lib.BN_free) 583 584 res = self._lib.RSA_generate_key_ex( 585 rsa_cdata, key_size, bn, self._ffi.NULL 586 ) 587 self.openssl_assert(res == 1) 588 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 589 590 return _RSAPrivateKey(self, rsa_cdata, evp_pkey) 591 592 def generate_rsa_parameters_supported(self, public_exponent, key_size): 593 return ( 594 public_exponent >= 3 595 and public_exponent & 1 != 0 596 and key_size >= 512 597 ) 598 599 def load_rsa_private_numbers(self, numbers): 600 rsa._check_private_key_components( 601 numbers.p, 602 numbers.q, 603 numbers.d, 604 numbers.dmp1, 605 numbers.dmq1, 606 numbers.iqmp, 607 numbers.public_numbers.e, 608 numbers.public_numbers.n, 609 ) 610 rsa_cdata = self._lib.RSA_new() 611 self.openssl_assert(rsa_cdata != self._ffi.NULL) 612 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 613 p = self._int_to_bn(numbers.p) 614 q = self._int_to_bn(numbers.q) 615 d = self._int_to_bn(numbers.d) 616 dmp1 = self._int_to_bn(numbers.dmp1) 617 dmq1 = self._int_to_bn(numbers.dmq1) 618 iqmp = self._int_to_bn(numbers.iqmp) 619 e = self._int_to_bn(numbers.public_numbers.e) 620 n = self._int_to_bn(numbers.public_numbers.n) 621 res = self._lib.RSA_set0_factors(rsa_cdata, p, q) 622 self.openssl_assert(res == 1) 623 res = self._lib.RSA_set0_key(rsa_cdata, n, e, d) 624 self.openssl_assert(res == 1) 625 res = self._lib.RSA_set0_crt_params(rsa_cdata, dmp1, dmq1, iqmp) 626 self.openssl_assert(res == 1) 627 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 628 629 return _RSAPrivateKey(self, rsa_cdata, evp_pkey) 630 631 def load_rsa_public_numbers(self, numbers): 632 rsa._check_public_key_components(numbers.e, numbers.n) 633 rsa_cdata = self._lib.RSA_new() 634 self.openssl_assert(rsa_cdata != self._ffi.NULL) 635 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 636 e = self._int_to_bn(numbers.e) 637 n = self._int_to_bn(numbers.n) 638 res = self._lib.RSA_set0_key(rsa_cdata, n, e, self._ffi.NULL) 639 self.openssl_assert(res == 1) 640 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 641 642 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 643 644 def _create_evp_pkey_gc(self): 645 evp_pkey = self._lib.EVP_PKEY_new() 646 self.openssl_assert(evp_pkey != self._ffi.NULL) 647 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 648 return evp_pkey 649 650 def _rsa_cdata_to_evp_pkey(self, rsa_cdata): 651 evp_pkey = self._create_evp_pkey_gc() 652 res = self._lib.EVP_PKEY_set1_RSA(evp_pkey, rsa_cdata) 653 self.openssl_assert(res == 1) 654 return evp_pkey 655 656 def _bytes_to_bio(self, data): 657 """ 658 Return a _MemoryBIO namedtuple of (BIO, char*). 659 660 The char* is the storage for the BIO and it must stay alive until the 661 BIO is finished with. 662 """ 663 data_ptr = self._ffi.from_buffer(data) 664 bio = self._lib.BIO_new_mem_buf(data_ptr, len(data)) 665 self.openssl_assert(bio != self._ffi.NULL) 666 667 return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_ptr) 668 669 def _create_mem_bio_gc(self): 670 """ 671 Creates an empty memory BIO. 672 """ 673 bio_method = self._lib.BIO_s_mem() 674 self.openssl_assert(bio_method != self._ffi.NULL) 675 bio = self._lib.BIO_new(bio_method) 676 self.openssl_assert(bio != self._ffi.NULL) 677 bio = self._ffi.gc(bio, self._lib.BIO_free) 678 return bio 679 680 def _read_mem_bio(self, bio): 681 """ 682 Reads a memory BIO. This only works on memory BIOs. 683 """ 684 buf = self._ffi.new("char **") 685 buf_len = self._lib.BIO_get_mem_data(bio, buf) 686 self.openssl_assert(buf_len > 0) 687 self.openssl_assert(buf[0] != self._ffi.NULL) 688 bio_data = self._ffi.buffer(buf[0], buf_len)[:] 689 return bio_data 690 691 def _evp_pkey_to_private_key(self, evp_pkey): 692 """ 693 Return the appropriate type of PrivateKey given an evp_pkey cdata 694 pointer. 695 """ 696 697 key_type = self._lib.EVP_PKEY_id(evp_pkey) 698 699 if key_type == self._lib.EVP_PKEY_RSA: 700 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) 701 self.openssl_assert(rsa_cdata != self._ffi.NULL) 702 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 703 return _RSAPrivateKey(self, rsa_cdata, evp_pkey) 704 elif key_type == self._lib.EVP_PKEY_DSA: 705 dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey) 706 self.openssl_assert(dsa_cdata != self._ffi.NULL) 707 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 708 return _DSAPrivateKey(self, dsa_cdata, evp_pkey) 709 elif key_type == self._lib.EVP_PKEY_EC: 710 ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey) 711 self.openssl_assert(ec_cdata != self._ffi.NULL) 712 ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) 713 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 714 elif key_type in self._dh_types: 715 dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey) 716 self.openssl_assert(dh_cdata != self._ffi.NULL) 717 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 718 return _DHPrivateKey(self, dh_cdata, evp_pkey) 719 elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None): 720 # EVP_PKEY_ED25519 is not present in OpenSSL < 1.1.1 721 return _Ed25519PrivateKey(self, evp_pkey) 722 elif key_type == getattr(self._lib, "EVP_PKEY_X448", None): 723 # EVP_PKEY_X448 is not present in OpenSSL < 1.1.1 724 return _X448PrivateKey(self, evp_pkey) 725 elif key_type == getattr(self._lib, "EVP_PKEY_X25519", None): 726 # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.0 727 return _X25519PrivateKey(self, evp_pkey) 728 elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None): 729 # EVP_PKEY_ED448 is not present in OpenSSL < 1.1.1 730 return _Ed448PrivateKey(self, evp_pkey) 731 else: 732 raise UnsupportedAlgorithm("Unsupported key type.") 733 734 def _evp_pkey_to_public_key(self, evp_pkey): 735 """ 736 Return the appropriate type of PublicKey given an evp_pkey cdata 737 pointer. 738 """ 739 740 key_type = self._lib.EVP_PKEY_id(evp_pkey) 741 742 if key_type == self._lib.EVP_PKEY_RSA: 743 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) 744 self.openssl_assert(rsa_cdata != self._ffi.NULL) 745 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 746 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 747 elif key_type == self._lib.EVP_PKEY_DSA: 748 dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey) 749 self.openssl_assert(dsa_cdata != self._ffi.NULL) 750 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 751 return _DSAPublicKey(self, dsa_cdata, evp_pkey) 752 elif key_type == self._lib.EVP_PKEY_EC: 753 ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey) 754 self.openssl_assert(ec_cdata != self._ffi.NULL) 755 ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) 756 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey) 757 elif key_type in self._dh_types: 758 dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey) 759 self.openssl_assert(dh_cdata != self._ffi.NULL) 760 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 761 return _DHPublicKey(self, dh_cdata, evp_pkey) 762 elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None): 763 # EVP_PKEY_ED25519 is not present in OpenSSL < 1.1.1 764 return _Ed25519PublicKey(self, evp_pkey) 765 elif key_type == getattr(self._lib, "EVP_PKEY_X448", None): 766 # EVP_PKEY_X448 is not present in OpenSSL < 1.1.1 767 return _X448PublicKey(self, evp_pkey) 768 elif key_type == getattr(self._lib, "EVP_PKEY_X25519", None): 769 # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.0 770 return _X25519PublicKey(self, evp_pkey) 771 elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None): 772 # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.1 773 return _Ed448PublicKey(self, evp_pkey) 774 else: 775 raise UnsupportedAlgorithm("Unsupported key type.") 776 777 def _oaep_hash_supported(self, algorithm): 778 if self._lib.Cryptography_HAS_RSA_OAEP_MD: 779 return isinstance( 780 algorithm, 781 ( 782 hashes.SHA1, 783 hashes.SHA224, 784 hashes.SHA256, 785 hashes.SHA384, 786 hashes.SHA512, 787 ), 788 ) 789 else: 790 return isinstance(algorithm, hashes.SHA1) 791 792 def rsa_padding_supported(self, padding): 793 if isinstance(padding, PKCS1v15): 794 return True 795 elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1): 796 return self.hash_supported(padding._mgf._algorithm) 797 elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1): 798 return ( 799 self._oaep_hash_supported(padding._mgf._algorithm) 800 and self._oaep_hash_supported(padding._algorithm) 801 and ( 802 (padding._label is None or len(padding._label) == 0) 803 or self._lib.Cryptography_HAS_RSA_OAEP_LABEL == 1 804 ) 805 ) 806 else: 807 return False 808 809 def generate_dsa_parameters(self, key_size): 810 if key_size not in (1024, 2048, 3072, 4096): 811 raise ValueError( 812 "Key size must be 1024, 2048, 3072, or 4096 bits." 813 ) 814 815 ctx = self._lib.DSA_new() 816 self.openssl_assert(ctx != self._ffi.NULL) 817 ctx = self._ffi.gc(ctx, self._lib.DSA_free) 818 819 res = self._lib.DSA_generate_parameters_ex( 820 ctx, 821 key_size, 822 self._ffi.NULL, 823 0, 824 self._ffi.NULL, 825 self._ffi.NULL, 826 self._ffi.NULL, 827 ) 828 829 self.openssl_assert(res == 1) 830 831 return _DSAParameters(self, ctx) 832 833 def generate_dsa_private_key(self, parameters): 834 ctx = self._lib.DSAparams_dup(parameters._dsa_cdata) 835 self.openssl_assert(ctx != self._ffi.NULL) 836 ctx = self._ffi.gc(ctx, self._lib.DSA_free) 837 self._lib.DSA_generate_key(ctx) 838 evp_pkey = self._dsa_cdata_to_evp_pkey(ctx) 839 840 return _DSAPrivateKey(self, ctx, evp_pkey) 841 842 def generate_dsa_private_key_and_parameters(self, key_size): 843 parameters = self.generate_dsa_parameters(key_size) 844 return self.generate_dsa_private_key(parameters) 845 846 def _dsa_cdata_set_values(self, dsa_cdata, p, q, g, pub_key, priv_key): 847 res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g) 848 self.openssl_assert(res == 1) 849 res = self._lib.DSA_set0_key(dsa_cdata, pub_key, priv_key) 850 self.openssl_assert(res == 1) 851 852 def load_dsa_private_numbers(self, numbers): 853 dsa._check_dsa_private_numbers(numbers) 854 parameter_numbers = numbers.public_numbers.parameter_numbers 855 856 dsa_cdata = self._lib.DSA_new() 857 self.openssl_assert(dsa_cdata != self._ffi.NULL) 858 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 859 860 p = self._int_to_bn(parameter_numbers.p) 861 q = self._int_to_bn(parameter_numbers.q) 862 g = self._int_to_bn(parameter_numbers.g) 863 pub_key = self._int_to_bn(numbers.public_numbers.y) 864 priv_key = self._int_to_bn(numbers.x) 865 self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key) 866 867 evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata) 868 869 return _DSAPrivateKey(self, dsa_cdata, evp_pkey) 870 871 def load_dsa_public_numbers(self, numbers): 872 dsa._check_dsa_parameters(numbers.parameter_numbers) 873 dsa_cdata = self._lib.DSA_new() 874 self.openssl_assert(dsa_cdata != self._ffi.NULL) 875 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 876 877 p = self._int_to_bn(numbers.parameter_numbers.p) 878 q = self._int_to_bn(numbers.parameter_numbers.q) 879 g = self._int_to_bn(numbers.parameter_numbers.g) 880 pub_key = self._int_to_bn(numbers.y) 881 priv_key = self._ffi.NULL 882 self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key) 883 884 evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata) 885 886 return _DSAPublicKey(self, dsa_cdata, evp_pkey) 887 888 def load_dsa_parameter_numbers(self, numbers): 889 dsa._check_dsa_parameters(numbers) 890 dsa_cdata = self._lib.DSA_new() 891 self.openssl_assert(dsa_cdata != self._ffi.NULL) 892 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 893 894 p = self._int_to_bn(numbers.p) 895 q = self._int_to_bn(numbers.q) 896 g = self._int_to_bn(numbers.g) 897 res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g) 898 self.openssl_assert(res == 1) 899 900 return _DSAParameters(self, dsa_cdata) 901 902 def _dsa_cdata_to_evp_pkey(self, dsa_cdata): 903 evp_pkey = self._create_evp_pkey_gc() 904 res = self._lib.EVP_PKEY_set1_DSA(evp_pkey, dsa_cdata) 905 self.openssl_assert(res == 1) 906 return evp_pkey 907 908 def dsa_hash_supported(self, algorithm): 909 return self.hash_supported(algorithm) 910 911 def dsa_parameters_supported(self, p, q, g): 912 return True 913 914 def cmac_algorithm_supported(self, algorithm): 915 return self.cipher_supported( 916 algorithm, CBC(b"\x00" * algorithm.block_size) 917 ) 918 919 def create_cmac_ctx(self, algorithm): 920 return _CMACContext(self, algorithm) 921 922 def _x509_check_signature_params(self, private_key, algorithm): 923 if isinstance( 924 private_key, (ed25519.Ed25519PrivateKey, ed448.Ed448PrivateKey) 925 ): 926 if algorithm is not None: 927 raise ValueError( 928 "algorithm must be None when signing via ed25519 or ed448" 929 ) 930 elif not isinstance( 931 private_key, 932 (rsa.RSAPrivateKey, dsa.DSAPrivateKey, ec.EllipticCurvePrivateKey), 933 ): 934 raise TypeError( 935 "Key must be an rsa, dsa, ec, ed25519, or ed448 private key." 936 ) 937 elif not isinstance(algorithm, hashes.HashAlgorithm): 938 raise TypeError("Algorithm must be a registered hash algorithm.") 939 elif isinstance(algorithm, hashes.MD5) and not isinstance( 940 private_key, rsa.RSAPrivateKey 941 ): 942 raise ValueError( 943 "MD5 hash algorithm is only supported with RSA keys" 944 ) 945 946 def create_x509_csr(self, builder, private_key, algorithm): 947 if not isinstance(builder, x509.CertificateSigningRequestBuilder): 948 raise TypeError("Builder type mismatch.") 949 self._x509_check_signature_params(private_key, algorithm) 950 951 # Resolve the signature algorithm. 952 evp_md = self._evp_md_x509_null_if_eddsa(private_key, algorithm) 953 954 # Create an empty request. 955 x509_req = self._lib.X509_REQ_new() 956 self.openssl_assert(x509_req != self._ffi.NULL) 957 x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free) 958 959 # Set x509 version. 960 res = self._lib.X509_REQ_set_version(x509_req, x509.Version.v1.value) 961 self.openssl_assert(res == 1) 962 963 # Set subject name. 964 res = self._lib.X509_REQ_set_subject_name( 965 x509_req, _encode_name_gc(self, builder._subject_name) 966 ) 967 self.openssl_assert(res == 1) 968 969 # Set subject public key. 970 public_key = private_key.public_key() 971 res = self._lib.X509_REQ_set_pubkey(x509_req, public_key._evp_pkey) 972 self.openssl_assert(res == 1) 973 974 # Add extensions. 975 sk_extension = self._lib.sk_X509_EXTENSION_new_null() 976 self.openssl_assert(sk_extension != self._ffi.NULL) 977 sk_extension = self._ffi.gc( 978 sk_extension, 979 lambda x: self._lib.sk_X509_EXTENSION_pop_free( 980 x, 981 self._ffi.addressof( 982 self._lib._original_lib, "X509_EXTENSION_free" 983 ), 984 ), 985 ) 986 # Don't GC individual extensions because the memory is owned by 987 # sk_extensions and will be freed along with it. 988 self._create_x509_extensions( 989 extensions=builder._extensions, 990 handlers=self._extension_encode_handlers, 991 x509_obj=sk_extension, 992 add_func=self._lib.sk_X509_EXTENSION_insert, 993 gc=False, 994 ) 995 res = self._lib.X509_REQ_add_extensions(x509_req, sk_extension) 996 self.openssl_assert(res == 1) 997 998 # Add attributes (all bytes encoded as ASN1 UTF8_STRING) 999 for attr_oid, attr_val in builder._attributes: 1000 obj = _txt2obj_gc(self, attr_oid.dotted_string) 1001 res = self._lib.X509_REQ_add1_attr_by_OBJ( 1002 x509_req, 1003 obj, 1004 x509.name._ASN1Type.UTF8String.value, 1005 attr_val, 1006 len(attr_val), 1007 ) 1008 self.openssl_assert(res == 1) 1009 1010 # Sign the request using the requester's private key. 1011 res = self._lib.X509_REQ_sign(x509_req, private_key._evp_pkey, evp_md) 1012 if res == 0: 1013 errors = self._consume_errors_with_text() 1014 raise ValueError("Signing failed", errors) 1015 1016 return _CertificateSigningRequest(self, x509_req) 1017 1018 def create_x509_certificate(self, builder, private_key, algorithm): 1019 if not isinstance(builder, x509.CertificateBuilder): 1020 raise TypeError("Builder type mismatch.") 1021 self._x509_check_signature_params(private_key, algorithm) 1022 1023 # Resolve the signature algorithm. 1024 evp_md = self._evp_md_x509_null_if_eddsa(private_key, algorithm) 1025 1026 # Create an empty certificate. 1027 x509_cert = self._lib.X509_new() 1028 x509_cert = self._ffi.gc(x509_cert, self._lib.X509_free) 1029 1030 # Set the x509 version. 1031 res = self._lib.X509_set_version(x509_cert, builder._version.value) 1032 self.openssl_assert(res == 1) 1033 1034 # Set the subject's name. 1035 res = self._lib.X509_set_subject_name( 1036 x509_cert, _encode_name_gc(self, builder._subject_name) 1037 ) 1038 self.openssl_assert(res == 1) 1039 1040 # Set the subject's public key. 1041 res = self._lib.X509_set_pubkey( 1042 x509_cert, builder._public_key._evp_pkey 1043 ) 1044 self.openssl_assert(res == 1) 1045 1046 # Set the certificate serial number. 1047 serial_number = _encode_asn1_int_gc(self, builder._serial_number) 1048 res = self._lib.X509_set_serialNumber(x509_cert, serial_number) 1049 self.openssl_assert(res == 1) 1050 1051 # Set the "not before" time. 1052 self._set_asn1_time( 1053 self._lib.X509_getm_notBefore(x509_cert), builder._not_valid_before 1054 ) 1055 1056 # Set the "not after" time. 1057 self._set_asn1_time( 1058 self._lib.X509_getm_notAfter(x509_cert), builder._not_valid_after 1059 ) 1060 1061 # Add extensions. 1062 self._create_x509_extensions( 1063 extensions=builder._extensions, 1064 handlers=self._extension_encode_handlers, 1065 x509_obj=x509_cert, 1066 add_func=self._lib.X509_add_ext, 1067 gc=True, 1068 ) 1069 1070 # Set the issuer name. 1071 res = self._lib.X509_set_issuer_name( 1072 x509_cert, _encode_name_gc(self, builder._issuer_name) 1073 ) 1074 self.openssl_assert(res == 1) 1075 1076 # Sign the certificate with the issuer's private key. 1077 res = self._lib.X509_sign(x509_cert, private_key._evp_pkey, evp_md) 1078 if res == 0: 1079 errors = self._consume_errors_with_text() 1080 raise ValueError("Signing failed", errors) 1081 1082 return _Certificate(self, x509_cert) 1083 1084 def _evp_md_x509_null_if_eddsa(self, private_key, algorithm): 1085 if isinstance( 1086 private_key, (ed25519.Ed25519PrivateKey, ed448.Ed448PrivateKey) 1087 ): 1088 # OpenSSL requires us to pass NULL for EVP_MD for ed25519/ed448 1089 return self._ffi.NULL 1090 else: 1091 return self._evp_md_non_null_from_algorithm(algorithm) 1092 1093 def _set_asn1_time(self, asn1_time, time): 1094 if time.year >= 2050: 1095 asn1_str = time.strftime("%Y%m%d%H%M%SZ").encode("ascii") 1096 else: 1097 asn1_str = time.strftime("%y%m%d%H%M%SZ").encode("ascii") 1098 res = self._lib.ASN1_TIME_set_string(asn1_time, asn1_str) 1099 self.openssl_assert(res == 1) 1100 1101 def _create_asn1_time(self, time): 1102 asn1_time = self._lib.ASN1_TIME_new() 1103 self.openssl_assert(asn1_time != self._ffi.NULL) 1104 asn1_time = self._ffi.gc(asn1_time, self._lib.ASN1_TIME_free) 1105 self._set_asn1_time(asn1_time, time) 1106 return asn1_time 1107 1108 def create_x509_crl(self, builder, private_key, algorithm): 1109 if not isinstance(builder, x509.CertificateRevocationListBuilder): 1110 raise TypeError("Builder type mismatch.") 1111 self._x509_check_signature_params(private_key, algorithm) 1112 1113 evp_md = self._evp_md_x509_null_if_eddsa(private_key, algorithm) 1114 1115 # Create an empty CRL. 1116 x509_crl = self._lib.X509_CRL_new() 1117 x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free) 1118 1119 # Set the x509 CRL version. We only support v2 (integer value 1). 1120 res = self._lib.X509_CRL_set_version(x509_crl, 1) 1121 self.openssl_assert(res == 1) 1122 1123 # Set the issuer name. 1124 res = self._lib.X509_CRL_set_issuer_name( 1125 x509_crl, _encode_name_gc(self, builder._issuer_name) 1126 ) 1127 self.openssl_assert(res == 1) 1128 1129 # Set the last update time. 1130 last_update = self._create_asn1_time(builder._last_update) 1131 res = self._lib.X509_CRL_set_lastUpdate(x509_crl, last_update) 1132 self.openssl_assert(res == 1) 1133 1134 # Set the next update time. 1135 next_update = self._create_asn1_time(builder._next_update) 1136 res = self._lib.X509_CRL_set_nextUpdate(x509_crl, next_update) 1137 self.openssl_assert(res == 1) 1138 1139 # Add extensions. 1140 self._create_x509_extensions( 1141 extensions=builder._extensions, 1142 handlers=self._crl_extension_encode_handlers, 1143 x509_obj=x509_crl, 1144 add_func=self._lib.X509_CRL_add_ext, 1145 gc=True, 1146 ) 1147 1148 # add revoked certificates 1149 for revoked_cert in builder._revoked_certificates: 1150 # Duplicating because the X509_CRL takes ownership and will free 1151 # this memory when X509_CRL_free is called. 1152 revoked = self._lib.X509_REVOKED_dup(revoked_cert._x509_revoked) 1153 self.openssl_assert(revoked != self._ffi.NULL) 1154 res = self._lib.X509_CRL_add0_revoked(x509_crl, revoked) 1155 self.openssl_assert(res == 1) 1156 1157 res = self._lib.X509_CRL_sign(x509_crl, private_key._evp_pkey, evp_md) 1158 if res == 0: 1159 errors = self._consume_errors_with_text() 1160 raise ValueError("Signing failed", errors) 1161 1162 return _CertificateRevocationList(self, x509_crl) 1163 1164 def _create_x509_extensions( 1165 self, extensions, handlers, x509_obj, add_func, gc 1166 ): 1167 for i, extension in enumerate(extensions): 1168 x509_extension = self._create_x509_extension(handlers, extension) 1169 self.openssl_assert(x509_extension != self._ffi.NULL) 1170 1171 if gc: 1172 x509_extension = self._ffi.gc( 1173 x509_extension, self._lib.X509_EXTENSION_free 1174 ) 1175 res = add_func(x509_obj, x509_extension, i) 1176 self.openssl_assert(res >= 1) 1177 1178 def _create_raw_x509_extension(self, extension, value): 1179 obj = _txt2obj_gc(self, extension.oid.dotted_string) 1180 return self._lib.X509_EXTENSION_create_by_OBJ( 1181 self._ffi.NULL, obj, 1 if extension.critical else 0, value 1182 ) 1183 1184 def _create_x509_extension(self, handlers, extension): 1185 if isinstance(extension.value, x509.UnrecognizedExtension): 1186 value = _encode_asn1_str_gc(self, extension.value.value) 1187 return self._create_raw_x509_extension(extension, value) 1188 elif isinstance(extension.value, x509.TLSFeature): 1189 asn1 = encode_der( 1190 SEQUENCE, 1191 *[ 1192 encode_der(INTEGER, encode_der_integer(x.value)) 1193 for x in extension.value 1194 ] 1195 ) 1196 value = _encode_asn1_str_gc(self, asn1) 1197 return self._create_raw_x509_extension(extension, value) 1198 elif isinstance(extension.value, x509.PrecertPoison): 1199 value = _encode_asn1_str_gc(self, encode_der(NULL)) 1200 return self._create_raw_x509_extension(extension, value) 1201 else: 1202 try: 1203 encode = handlers[extension.oid] 1204 except KeyError: 1205 raise NotImplementedError( 1206 "Extension not supported: {}".format(extension.oid) 1207 ) 1208 1209 ext_struct = encode(self, extension.value) 1210 nid = self._lib.OBJ_txt2nid( 1211 extension.oid.dotted_string.encode("ascii") 1212 ) 1213 self.openssl_assert(nid != self._lib.NID_undef) 1214 return self._lib.X509V3_EXT_i2d( 1215 nid, 1 if extension.critical else 0, ext_struct 1216 ) 1217 1218 def create_x509_revoked_certificate(self, builder): 1219 if not isinstance(builder, x509.RevokedCertificateBuilder): 1220 raise TypeError("Builder type mismatch.") 1221 1222 x509_revoked = self._lib.X509_REVOKED_new() 1223 self.openssl_assert(x509_revoked != self._ffi.NULL) 1224 x509_revoked = self._ffi.gc(x509_revoked, self._lib.X509_REVOKED_free) 1225 serial_number = _encode_asn1_int_gc(self, builder._serial_number) 1226 res = self._lib.X509_REVOKED_set_serialNumber( 1227 x509_revoked, serial_number 1228 ) 1229 self.openssl_assert(res == 1) 1230 rev_date = self._create_asn1_time(builder._revocation_date) 1231 res = self._lib.X509_REVOKED_set_revocationDate(x509_revoked, rev_date) 1232 self.openssl_assert(res == 1) 1233 # add CRL entry extensions 1234 self._create_x509_extensions( 1235 extensions=builder._extensions, 1236 handlers=self._crl_entry_extension_encode_handlers, 1237 x509_obj=x509_revoked, 1238 add_func=self._lib.X509_REVOKED_add_ext, 1239 gc=True, 1240 ) 1241 return _RevokedCertificate(self, None, x509_revoked) 1242 1243 def load_pem_private_key(self, data, password): 1244 return self._load_key( 1245 self._lib.PEM_read_bio_PrivateKey, 1246 self._evp_pkey_to_private_key, 1247 data, 1248 password, 1249 ) 1250 1251 def load_pem_public_key(self, data): 1252 mem_bio = self._bytes_to_bio(data) 1253 evp_pkey = self._lib.PEM_read_bio_PUBKEY( 1254 mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 1255 ) 1256 if evp_pkey != self._ffi.NULL: 1257 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 1258 return self._evp_pkey_to_public_key(evp_pkey) 1259 else: 1260 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still 1261 # need to check to see if it is a pure PKCS1 RSA public key (not 1262 # embedded in a subjectPublicKeyInfo) 1263 self._consume_errors() 1264 res = self._lib.BIO_reset(mem_bio.bio) 1265 self.openssl_assert(res == 1) 1266 rsa_cdata = self._lib.PEM_read_bio_RSAPublicKey( 1267 mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 1268 ) 1269 if rsa_cdata != self._ffi.NULL: 1270 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 1271 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 1272 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 1273 else: 1274 self._handle_key_loading_error() 1275 1276 def load_pem_parameters(self, data): 1277 mem_bio = self._bytes_to_bio(data) 1278 # only DH is supported currently 1279 dh_cdata = self._lib.PEM_read_bio_DHparams( 1280 mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 1281 ) 1282 if dh_cdata != self._ffi.NULL: 1283 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 1284 return _DHParameters(self, dh_cdata) 1285 else: 1286 self._handle_key_loading_error() 1287 1288 def load_der_private_key(self, data, password): 1289 # OpenSSL has a function called d2i_AutoPrivateKey that in theory 1290 # handles this automatically, however it doesn't handle encrypted 1291 # private keys. Instead we try to load the key two different ways. 1292 # First we'll try to load it as a traditional key. 1293 bio_data = self._bytes_to_bio(data) 1294 key = self._evp_pkey_from_der_traditional_key(bio_data, password) 1295 if key: 1296 return self._evp_pkey_to_private_key(key) 1297 else: 1298 # Finally we try to load it with the method that handles encrypted 1299 # PKCS8 properly. 1300 return self._load_key( 1301 self._lib.d2i_PKCS8PrivateKey_bio, 1302 self._evp_pkey_to_private_key, 1303 data, 1304 password, 1305 ) 1306 1307 def _evp_pkey_from_der_traditional_key(self, bio_data, password): 1308 key = self._lib.d2i_PrivateKey_bio(bio_data.bio, self._ffi.NULL) 1309 if key != self._ffi.NULL: 1310 key = self._ffi.gc(key, self._lib.EVP_PKEY_free) 1311 if password is not None: 1312 raise TypeError( 1313 "Password was given but private key is not encrypted." 1314 ) 1315 1316 return key 1317 else: 1318 self._consume_errors() 1319 return None 1320 1321 def load_der_public_key(self, data): 1322 mem_bio = self._bytes_to_bio(data) 1323 evp_pkey = self._lib.d2i_PUBKEY_bio(mem_bio.bio, self._ffi.NULL) 1324 if evp_pkey != self._ffi.NULL: 1325 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 1326 return self._evp_pkey_to_public_key(evp_pkey) 1327 else: 1328 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still 1329 # need to check to see if it is a pure PKCS1 RSA public key (not 1330 # embedded in a subjectPublicKeyInfo) 1331 self._consume_errors() 1332 res = self._lib.BIO_reset(mem_bio.bio) 1333 self.openssl_assert(res == 1) 1334 rsa_cdata = self._lib.d2i_RSAPublicKey_bio( 1335 mem_bio.bio, self._ffi.NULL 1336 ) 1337 if rsa_cdata != self._ffi.NULL: 1338 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 1339 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 1340 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 1341 else: 1342 self._handle_key_loading_error() 1343 1344 def load_der_parameters(self, data): 1345 mem_bio = self._bytes_to_bio(data) 1346 dh_cdata = self._lib.d2i_DHparams_bio(mem_bio.bio, self._ffi.NULL) 1347 if dh_cdata != self._ffi.NULL: 1348 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 1349 return _DHParameters(self, dh_cdata) 1350 elif self._lib.Cryptography_HAS_EVP_PKEY_DHX: 1351 # We check to see if the is dhx. 1352 self._consume_errors() 1353 res = self._lib.BIO_reset(mem_bio.bio) 1354 self.openssl_assert(res == 1) 1355 dh_cdata = self._lib.Cryptography_d2i_DHxparams_bio( 1356 mem_bio.bio, self._ffi.NULL 1357 ) 1358 if dh_cdata != self._ffi.NULL: 1359 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 1360 return _DHParameters(self, dh_cdata) 1361 1362 self._handle_key_loading_error() 1363 1364 def load_pem_x509_certificate(self, data): 1365 mem_bio = self._bytes_to_bio(data) 1366 x509 = self._lib.PEM_read_bio_X509( 1367 mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 1368 ) 1369 if x509 == self._ffi.NULL: 1370 self._consume_errors() 1371 raise ValueError( 1372 "Unable to load certificate. See https://cryptography.io/en/" 1373 "latest/faq.html#why-can-t-i-import-my-pem-file for more" 1374 " details." 1375 ) 1376 1377 x509 = self._ffi.gc(x509, self._lib.X509_free) 1378 return _Certificate(self, x509) 1379 1380 def load_der_x509_certificate(self, data): 1381 mem_bio = self._bytes_to_bio(data) 1382 x509 = self._lib.d2i_X509_bio(mem_bio.bio, self._ffi.NULL) 1383 if x509 == self._ffi.NULL: 1384 self._consume_errors() 1385 raise ValueError("Unable to load certificate") 1386 1387 x509 = self._ffi.gc(x509, self._lib.X509_free) 1388 return _Certificate(self, x509) 1389 1390 def load_pem_x509_crl(self, data): 1391 mem_bio = self._bytes_to_bio(data) 1392 x509_crl = self._lib.PEM_read_bio_X509_CRL( 1393 mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 1394 ) 1395 if x509_crl == self._ffi.NULL: 1396 self._consume_errors() 1397 raise ValueError( 1398 "Unable to load CRL. See https://cryptography.io/en/la" 1399 "test/faq.html#why-can-t-i-import-my-pem-file for more" 1400 " details." 1401 ) 1402 1403 x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free) 1404 return _CertificateRevocationList(self, x509_crl) 1405 1406 def load_der_x509_crl(self, data): 1407 mem_bio = self._bytes_to_bio(data) 1408 x509_crl = self._lib.d2i_X509_CRL_bio(mem_bio.bio, self._ffi.NULL) 1409 if x509_crl == self._ffi.NULL: 1410 self._consume_errors() 1411 raise ValueError("Unable to load CRL") 1412 1413 x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free) 1414 return _CertificateRevocationList(self, x509_crl) 1415 1416 def load_pem_x509_csr(self, data): 1417 mem_bio = self._bytes_to_bio(data) 1418 x509_req = self._lib.PEM_read_bio_X509_REQ( 1419 mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 1420 ) 1421 if x509_req == self._ffi.NULL: 1422 self._consume_errors() 1423 raise ValueError( 1424 "Unable to load request. See https://cryptography.io/en/" 1425 "latest/faq.html#why-can-t-i-import-my-pem-file for more" 1426 " details." 1427 ) 1428 1429 x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free) 1430 return _CertificateSigningRequest(self, x509_req) 1431 1432 def load_der_x509_csr(self, data): 1433 mem_bio = self._bytes_to_bio(data) 1434 x509_req = self._lib.d2i_X509_REQ_bio(mem_bio.bio, self._ffi.NULL) 1435 if x509_req == self._ffi.NULL: 1436 self._consume_errors() 1437 raise ValueError("Unable to load request") 1438 1439 x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free) 1440 return _CertificateSigningRequest(self, x509_req) 1441 1442 def _load_key(self, openssl_read_func, convert_func, data, password): 1443 mem_bio = self._bytes_to_bio(data) 1444 1445 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *") 1446 if password is not None: 1447 utils._check_byteslike("password", password) 1448 password_ptr = self._ffi.from_buffer(password) 1449 userdata.password = password_ptr 1450 userdata.length = len(password) 1451 1452 evp_pkey = openssl_read_func( 1453 mem_bio.bio, 1454 self._ffi.NULL, 1455 self._ffi.addressof( 1456 self._lib._original_lib, "Cryptography_pem_password_cb" 1457 ), 1458 userdata, 1459 ) 1460 1461 if evp_pkey == self._ffi.NULL: 1462 if userdata.error != 0: 1463 self._consume_errors() 1464 if userdata.error == -1: 1465 raise TypeError( 1466 "Password was not given but private key is encrypted" 1467 ) 1468 else: 1469 assert userdata.error == -2 1470 raise ValueError( 1471 "Passwords longer than {} bytes are not supported " 1472 "by this backend.".format(userdata.maxsize - 1) 1473 ) 1474 else: 1475 self._handle_key_loading_error() 1476 1477 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 1478 1479 if password is not None and userdata.called == 0: 1480 raise TypeError( 1481 "Password was given but private key is not encrypted." 1482 ) 1483 1484 assert ( 1485 password is not None and userdata.called == 1 1486 ) or password is None 1487 1488 return convert_func(evp_pkey) 1489 1490 def _handle_key_loading_error(self): 1491 errors = self._consume_errors() 1492 1493 if not errors: 1494 raise ValueError( 1495 "Could not deserialize key data. The data may be in an " 1496 "incorrect format or it may be encrypted with an unsupported " 1497 "algorithm." 1498 ) 1499 elif errors[0]._lib_reason_match( 1500 self._lib.ERR_LIB_EVP, self._lib.EVP_R_BAD_DECRYPT 1501 ) or errors[0]._lib_reason_match( 1502 self._lib.ERR_LIB_PKCS12, 1503 self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR, 1504 ): 1505 raise ValueError("Bad decrypt. Incorrect password?") 1506 1507 elif any( 1508 error._lib_reason_match( 1509 self._lib.ERR_LIB_EVP, 1510 self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM, 1511 ) 1512 for error in errors 1513 ): 1514 raise ValueError("Unsupported public key algorithm.") 1515 1516 else: 1517 raise ValueError( 1518 "Could not deserialize key data. The data may be in an " 1519 "incorrect format or it may be encrypted with an unsupported " 1520 "algorithm." 1521 ) 1522 1523 def elliptic_curve_supported(self, curve): 1524 try: 1525 curve_nid = self._elliptic_curve_to_nid(curve) 1526 except UnsupportedAlgorithm: 1527 curve_nid = self._lib.NID_undef 1528 1529 group = self._lib.EC_GROUP_new_by_curve_name(curve_nid) 1530 1531 if group == self._ffi.NULL: 1532 self._consume_errors() 1533 return False 1534 else: 1535 self.openssl_assert(curve_nid != self._lib.NID_undef) 1536 self._lib.EC_GROUP_free(group) 1537 return True 1538 1539 def elliptic_curve_signature_algorithm_supported( 1540 self, signature_algorithm, curve 1541 ): 1542 # We only support ECDSA right now. 1543 if not isinstance(signature_algorithm, ec.ECDSA): 1544 return False 1545 1546 return self.elliptic_curve_supported(curve) 1547 1548 def generate_elliptic_curve_private_key(self, curve): 1549 """ 1550 Generate a new private key on the named curve. 1551 """ 1552 1553 if self.elliptic_curve_supported(curve): 1554 ec_cdata = self._ec_key_new_by_curve(curve) 1555 1556 res = self._lib.EC_KEY_generate_key(ec_cdata) 1557 self.openssl_assert(res == 1) 1558 1559 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 1560 1561 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 1562 else: 1563 raise UnsupportedAlgorithm( 1564 "Backend object does not support {}.".format(curve.name), 1565 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE, 1566 ) 1567 1568 def load_elliptic_curve_private_numbers(self, numbers): 1569 public = numbers.public_numbers 1570 1571 ec_cdata = self._ec_key_new_by_curve(public.curve) 1572 1573 private_value = self._ffi.gc( 1574 self._int_to_bn(numbers.private_value), self._lib.BN_clear_free 1575 ) 1576 res = self._lib.EC_KEY_set_private_key(ec_cdata, private_value) 1577 self.openssl_assert(res == 1) 1578 1579 ec_cdata = self._ec_key_set_public_key_affine_coordinates( 1580 ec_cdata, public.x, public.y 1581 ) 1582 1583 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 1584 1585 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 1586 1587 def load_elliptic_curve_public_numbers(self, numbers): 1588 ec_cdata = self._ec_key_new_by_curve(numbers.curve) 1589 ec_cdata = self._ec_key_set_public_key_affine_coordinates( 1590 ec_cdata, numbers.x, numbers.y 1591 ) 1592 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 1593 1594 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey) 1595 1596 def load_elliptic_curve_public_bytes(self, curve, point_bytes): 1597 ec_cdata = self._ec_key_new_by_curve(curve) 1598 group = self._lib.EC_KEY_get0_group(ec_cdata) 1599 self.openssl_assert(group != self._ffi.NULL) 1600 point = self._lib.EC_POINT_new(group) 1601 self.openssl_assert(point != self._ffi.NULL) 1602 point = self._ffi.gc(point, self._lib.EC_POINT_free) 1603 with self._tmp_bn_ctx() as bn_ctx: 1604 res = self._lib.EC_POINT_oct2point( 1605 group, point, point_bytes, len(point_bytes), bn_ctx 1606 ) 1607 if res != 1: 1608 self._consume_errors() 1609 raise ValueError("Invalid public bytes for the given curve") 1610 1611 res = self._lib.EC_KEY_set_public_key(ec_cdata, point) 1612 self.openssl_assert(res == 1) 1613 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 1614 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey) 1615 1616 def derive_elliptic_curve_private_key(self, private_value, curve): 1617 ec_cdata = self._ec_key_new_by_curve(curve) 1618 1619 get_func, group = self._ec_key_determine_group_get_func(ec_cdata) 1620 1621 point = self._lib.EC_POINT_new(group) 1622 self.openssl_assert(point != self._ffi.NULL) 1623 point = self._ffi.gc(point, self._lib.EC_POINT_free) 1624 1625 value = self._int_to_bn(private_value) 1626 value = self._ffi.gc(value, self._lib.BN_clear_free) 1627 1628 with self._tmp_bn_ctx() as bn_ctx: 1629 res = self._lib.EC_POINT_mul( 1630 group, point, value, self._ffi.NULL, self._ffi.NULL, bn_ctx 1631 ) 1632 self.openssl_assert(res == 1) 1633 1634 bn_x = self._lib.BN_CTX_get(bn_ctx) 1635 bn_y = self._lib.BN_CTX_get(bn_ctx) 1636 1637 res = get_func(group, point, bn_x, bn_y, bn_ctx) 1638 self.openssl_assert(res == 1) 1639 1640 res = self._lib.EC_KEY_set_public_key(ec_cdata, point) 1641 self.openssl_assert(res == 1) 1642 private = self._int_to_bn(private_value) 1643 private = self._ffi.gc(private, self._lib.BN_clear_free) 1644 res = self._lib.EC_KEY_set_private_key(ec_cdata, private) 1645 self.openssl_assert(res == 1) 1646 1647 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 1648 1649 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 1650 1651 def _ec_key_new_by_curve(self, curve): 1652 curve_nid = self._elliptic_curve_to_nid(curve) 1653 return self._ec_key_new_by_curve_nid(curve_nid) 1654 1655 def _ec_key_new_by_curve_nid(self, curve_nid): 1656 ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid) 1657 self.openssl_assert(ec_cdata != self._ffi.NULL) 1658 return self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) 1659 1660 def load_der_ocsp_request(self, data): 1661 mem_bio = self._bytes_to_bio(data) 1662 request = self._lib.d2i_OCSP_REQUEST_bio(mem_bio.bio, self._ffi.NULL) 1663 if request == self._ffi.NULL: 1664 self._consume_errors() 1665 raise ValueError("Unable to load OCSP request") 1666 1667 request = self._ffi.gc(request, self._lib.OCSP_REQUEST_free) 1668 return _OCSPRequest(self, request) 1669 1670 def load_der_ocsp_response(self, data): 1671 mem_bio = self._bytes_to_bio(data) 1672 response = self._lib.d2i_OCSP_RESPONSE_bio(mem_bio.bio, self._ffi.NULL) 1673 if response == self._ffi.NULL: 1674 self._consume_errors() 1675 raise ValueError("Unable to load OCSP response") 1676 1677 response = self._ffi.gc(response, self._lib.OCSP_RESPONSE_free) 1678 return _OCSPResponse(self, response) 1679 1680 def create_ocsp_request(self, builder): 1681 ocsp_req = self._lib.OCSP_REQUEST_new() 1682 self.openssl_assert(ocsp_req != self._ffi.NULL) 1683 ocsp_req = self._ffi.gc(ocsp_req, self._lib.OCSP_REQUEST_free) 1684 cert, issuer, algorithm = builder._request 1685 evp_md = self._evp_md_non_null_from_algorithm(algorithm) 1686 certid = self._lib.OCSP_cert_to_id(evp_md, cert._x509, issuer._x509) 1687 self.openssl_assert(certid != self._ffi.NULL) 1688 onereq = self._lib.OCSP_request_add0_id(ocsp_req, certid) 1689 self.openssl_assert(onereq != self._ffi.NULL) 1690 self._create_x509_extensions( 1691 extensions=builder._extensions, 1692 handlers=self._ocsp_request_extension_encode_handlers, 1693 x509_obj=ocsp_req, 1694 add_func=self._lib.OCSP_REQUEST_add_ext, 1695 gc=True, 1696 ) 1697 return _OCSPRequest(self, ocsp_req) 1698 1699 def _create_ocsp_basic_response(self, builder, private_key, algorithm): 1700 self._x509_check_signature_params(private_key, algorithm) 1701 1702 basic = self._lib.OCSP_BASICRESP_new() 1703 self.openssl_assert(basic != self._ffi.NULL) 1704 basic = self._ffi.gc(basic, self._lib.OCSP_BASICRESP_free) 1705 evp_md = self._evp_md_non_null_from_algorithm( 1706 builder._response._algorithm 1707 ) 1708 certid = self._lib.OCSP_cert_to_id( 1709 evp_md, 1710 builder._response._cert._x509, 1711 builder._response._issuer._x509, 1712 ) 1713 self.openssl_assert(certid != self._ffi.NULL) 1714 certid = self._ffi.gc(certid, self._lib.OCSP_CERTID_free) 1715 if builder._response._revocation_reason is None: 1716 reason = -1 1717 else: 1718 reason = _CRL_ENTRY_REASON_ENUM_TO_CODE[ 1719 builder._response._revocation_reason 1720 ] 1721 if builder._response._revocation_time is None: 1722 rev_time = self._ffi.NULL 1723 else: 1724 rev_time = self._create_asn1_time( 1725 builder._response._revocation_time 1726 ) 1727 1728 next_update = self._ffi.NULL 1729 if builder._response._next_update is not None: 1730 next_update = self._create_asn1_time( 1731 builder._response._next_update 1732 ) 1733 1734 this_update = self._create_asn1_time(builder._response._this_update) 1735 1736 res = self._lib.OCSP_basic_add1_status( 1737 basic, 1738 certid, 1739 builder._response._cert_status.value, 1740 reason, 1741 rev_time, 1742 this_update, 1743 next_update, 1744 ) 1745 self.openssl_assert(res != self._ffi.NULL) 1746 # okay, now sign the basic structure 1747 evp_md = self._evp_md_x509_null_if_eddsa(private_key, algorithm) 1748 responder_cert, responder_encoding = builder._responder_id 1749 flags = self._lib.OCSP_NOCERTS 1750 if responder_encoding is ocsp.OCSPResponderEncoding.HASH: 1751 flags |= self._lib.OCSP_RESPID_KEY 1752 1753 if builder._certs is not None: 1754 for cert in builder._certs: 1755 res = self._lib.OCSP_basic_add1_cert(basic, cert._x509) 1756 self.openssl_assert(res == 1) 1757 1758 self._create_x509_extensions( 1759 extensions=builder._extensions, 1760 handlers=self._ocsp_basicresp_extension_encode_handlers, 1761 x509_obj=basic, 1762 add_func=self._lib.OCSP_BASICRESP_add_ext, 1763 gc=True, 1764 ) 1765 1766 res = self._lib.OCSP_basic_sign( 1767 basic, 1768 responder_cert._x509, 1769 private_key._evp_pkey, 1770 evp_md, 1771 self._ffi.NULL, 1772 flags, 1773 ) 1774 if res != 1: 1775 errors = self._consume_errors_with_text() 1776 raise ValueError( 1777 "Error while signing. responder_cert must be signed " 1778 "by private_key", 1779 errors, 1780 ) 1781 1782 return basic 1783 1784 def create_ocsp_response( 1785 self, response_status, builder, private_key, algorithm 1786 ): 1787 if response_status is ocsp.OCSPResponseStatus.SUCCESSFUL: 1788 basic = self._create_ocsp_basic_response( 1789 builder, private_key, algorithm 1790 ) 1791 else: 1792 basic = self._ffi.NULL 1793 1794 ocsp_resp = self._lib.OCSP_response_create( 1795 response_status.value, basic 1796 ) 1797 self.openssl_assert(ocsp_resp != self._ffi.NULL) 1798 ocsp_resp = self._ffi.gc(ocsp_resp, self._lib.OCSP_RESPONSE_free) 1799 return _OCSPResponse(self, ocsp_resp) 1800 1801 def elliptic_curve_exchange_algorithm_supported(self, algorithm, curve): 1802 return self.elliptic_curve_supported(curve) and isinstance( 1803 algorithm, ec.ECDH 1804 ) 1805 1806 def _ec_cdata_to_evp_pkey(self, ec_cdata): 1807 evp_pkey = self._create_evp_pkey_gc() 1808 res = self._lib.EVP_PKEY_set1_EC_KEY(evp_pkey, ec_cdata) 1809 self.openssl_assert(res == 1) 1810 return evp_pkey 1811 1812 def _elliptic_curve_to_nid(self, curve): 1813 """ 1814 Get the NID for a curve name. 1815 """ 1816 1817 curve_aliases = {"secp192r1": "prime192v1", "secp256r1": "prime256v1"} 1818 1819 curve_name = curve_aliases.get(curve.name, curve.name) 1820 1821 curve_nid = self._lib.OBJ_sn2nid(curve_name.encode()) 1822 if curve_nid == self._lib.NID_undef: 1823 raise UnsupportedAlgorithm( 1824 "{} is not a supported elliptic curve".format(curve.name), 1825 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE, 1826 ) 1827 return curve_nid 1828 1829 @contextmanager 1830 def _tmp_bn_ctx(self): 1831 bn_ctx = self._lib.BN_CTX_new() 1832 self.openssl_assert(bn_ctx != self._ffi.NULL) 1833 bn_ctx = self._ffi.gc(bn_ctx, self._lib.BN_CTX_free) 1834 self._lib.BN_CTX_start(bn_ctx) 1835 try: 1836 yield bn_ctx 1837 finally: 1838 self._lib.BN_CTX_end(bn_ctx) 1839 1840 def _ec_key_determine_group_get_func(self, ctx): 1841 """ 1842 Given an EC_KEY determine the group and what function is required to 1843 get point coordinates. 1844 """ 1845 self.openssl_assert(ctx != self._ffi.NULL) 1846 1847 nid_two_field = self._lib.OBJ_sn2nid(b"characteristic-two-field") 1848 self.openssl_assert(nid_two_field != self._lib.NID_undef) 1849 1850 group = self._lib.EC_KEY_get0_group(ctx) 1851 self.openssl_assert(group != self._ffi.NULL) 1852 1853 method = self._lib.EC_GROUP_method_of(group) 1854 self.openssl_assert(method != self._ffi.NULL) 1855 1856 nid = self._lib.EC_METHOD_get_field_type(method) 1857 self.openssl_assert(nid != self._lib.NID_undef) 1858 1859 if nid == nid_two_field and self._lib.Cryptography_HAS_EC2M: 1860 get_func = self._lib.EC_POINT_get_affine_coordinates_GF2m 1861 else: 1862 get_func = self._lib.EC_POINT_get_affine_coordinates_GFp 1863 1864 assert get_func 1865 1866 return get_func, group 1867 1868 def _ec_key_set_public_key_affine_coordinates(self, ctx, x, y): 1869 """ 1870 Sets the public key point in the EC_KEY context to the affine x and y 1871 values. 1872 """ 1873 1874 if x < 0 or y < 0: 1875 raise ValueError( 1876 "Invalid EC key. Both x and y must be non-negative." 1877 ) 1878 1879 x = self._ffi.gc(self._int_to_bn(x), self._lib.BN_free) 1880 y = self._ffi.gc(self._int_to_bn(y), self._lib.BN_free) 1881 res = self._lib.EC_KEY_set_public_key_affine_coordinates(ctx, x, y) 1882 if res != 1: 1883 self._consume_errors() 1884 raise ValueError("Invalid EC key.") 1885 1886 return ctx 1887 1888 def _private_key_bytes( 1889 self, encoding, format, encryption_algorithm, key, evp_pkey, cdata 1890 ): 1891 # validate argument types 1892 if not isinstance(encoding, serialization.Encoding): 1893 raise TypeError("encoding must be an item from the Encoding enum") 1894 if not isinstance(format, serialization.PrivateFormat): 1895 raise TypeError( 1896 "format must be an item from the PrivateFormat enum" 1897 ) 1898 if not isinstance( 1899 encryption_algorithm, serialization.KeySerializationEncryption 1900 ): 1901 raise TypeError( 1902 "Encryption algorithm must be a KeySerializationEncryption " 1903 "instance" 1904 ) 1905 1906 # validate password 1907 if isinstance(encryption_algorithm, serialization.NoEncryption): 1908 password = b"" 1909 elif isinstance( 1910 encryption_algorithm, serialization.BestAvailableEncryption 1911 ): 1912 password = encryption_algorithm.password 1913 if len(password) > 1023: 1914 raise ValueError( 1915 "Passwords longer than 1023 bytes are not supported by " 1916 "this backend" 1917 ) 1918 else: 1919 raise ValueError("Unsupported encryption type") 1920 1921 # PKCS8 + PEM/DER 1922 if format is serialization.PrivateFormat.PKCS8: 1923 if encoding is serialization.Encoding.PEM: 1924 write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey 1925 elif encoding is serialization.Encoding.DER: 1926 write_bio = self._lib.i2d_PKCS8PrivateKey_bio 1927 else: 1928 raise ValueError("Unsupported encoding for PKCS8") 1929 return self._private_key_bytes_via_bio( 1930 write_bio, evp_pkey, password 1931 ) 1932 1933 # TraditionalOpenSSL + PEM/DER 1934 if format is serialization.PrivateFormat.TraditionalOpenSSL: 1935 if self._fips_enabled and not isinstance( 1936 encryption_algorithm, serialization.NoEncryption 1937 ): 1938 raise ValueError( 1939 "Encrypted traditional OpenSSL format is not " 1940 "supported in FIPS mode." 1941 ) 1942 key_type = self._lib.EVP_PKEY_id(evp_pkey) 1943 1944 if encoding is serialization.Encoding.PEM: 1945 if key_type == self._lib.EVP_PKEY_RSA: 1946 write_bio = self._lib.PEM_write_bio_RSAPrivateKey 1947 elif key_type == self._lib.EVP_PKEY_DSA: 1948 write_bio = self._lib.PEM_write_bio_DSAPrivateKey 1949 elif key_type == self._lib.EVP_PKEY_EC: 1950 write_bio = self._lib.PEM_write_bio_ECPrivateKey 1951 else: 1952 raise ValueError( 1953 "Unsupported key type for TraditionalOpenSSL" 1954 ) 1955 return self._private_key_bytes_via_bio( 1956 write_bio, cdata, password 1957 ) 1958 1959 if encoding is serialization.Encoding.DER: 1960 if password: 1961 raise ValueError( 1962 "Encryption is not supported for DER encoded " 1963 "traditional OpenSSL keys" 1964 ) 1965 if key_type == self._lib.EVP_PKEY_RSA: 1966 write_bio = self._lib.i2d_RSAPrivateKey_bio 1967 elif key_type == self._lib.EVP_PKEY_EC: 1968 write_bio = self._lib.i2d_ECPrivateKey_bio 1969 elif key_type == self._lib.EVP_PKEY_DSA: 1970 write_bio = self._lib.i2d_DSAPrivateKey_bio 1971 else: 1972 raise ValueError( 1973 "Unsupported key type for TraditionalOpenSSL" 1974 ) 1975 return self._bio_func_output(write_bio, cdata) 1976 1977 raise ValueError("Unsupported encoding for TraditionalOpenSSL") 1978 1979 # OpenSSH + PEM 1980 if format is serialization.PrivateFormat.OpenSSH: 1981 if encoding is serialization.Encoding.PEM: 1982 return ssh.serialize_ssh_private_key(key, password) 1983 1984 raise ValueError( 1985 "OpenSSH private key format can only be used" 1986 " with PEM encoding" 1987 ) 1988 1989 # Anything that key-specific code was supposed to handle earlier, 1990 # like Raw. 1991 raise ValueError("format is invalid with this key") 1992 1993 def _private_key_bytes_via_bio(self, write_bio, evp_pkey, password): 1994 if not password: 1995 evp_cipher = self._ffi.NULL 1996 else: 1997 # This is a curated value that we will update over time. 1998 evp_cipher = self._lib.EVP_get_cipherbyname(b"aes-256-cbc") 1999 2000 return self._bio_func_output( 2001 write_bio, 2002 evp_pkey, 2003 evp_cipher, 2004 password, 2005 len(password), 2006 self._ffi.NULL, 2007 self._ffi.NULL, 2008 ) 2009 2010 def _bio_func_output(self, write_bio, *args): 2011 bio = self._create_mem_bio_gc() 2012 res = write_bio(bio, *args) 2013 self.openssl_assert(res == 1) 2014 return self._read_mem_bio(bio) 2015 2016 def _public_key_bytes(self, encoding, format, key, evp_pkey, cdata): 2017 if not isinstance(encoding, serialization.Encoding): 2018 raise TypeError("encoding must be an item from the Encoding enum") 2019 if not isinstance(format, serialization.PublicFormat): 2020 raise TypeError( 2021 "format must be an item from the PublicFormat enum" 2022 ) 2023 2024 # SubjectPublicKeyInfo + PEM/DER 2025 if format is serialization.PublicFormat.SubjectPublicKeyInfo: 2026 if encoding is serialization.Encoding.PEM: 2027 write_bio = self._lib.PEM_write_bio_PUBKEY 2028 elif encoding is serialization.Encoding.DER: 2029 write_bio = self._lib.i2d_PUBKEY_bio 2030 else: 2031 raise ValueError( 2032 "SubjectPublicKeyInfo works only with PEM or DER encoding" 2033 ) 2034 return self._bio_func_output(write_bio, evp_pkey) 2035 2036 # PKCS1 + PEM/DER 2037 if format is serialization.PublicFormat.PKCS1: 2038 # Only RSA is supported here. 2039 key_type = self._lib.EVP_PKEY_id(evp_pkey) 2040 if key_type != self._lib.EVP_PKEY_RSA: 2041 raise ValueError("PKCS1 format is supported only for RSA keys") 2042 2043 if encoding is serialization.Encoding.PEM: 2044 write_bio = self._lib.PEM_write_bio_RSAPublicKey 2045 elif encoding is serialization.Encoding.DER: 2046 write_bio = self._lib.i2d_RSAPublicKey_bio 2047 else: 2048 raise ValueError("PKCS1 works only with PEM or DER encoding") 2049 return self._bio_func_output(write_bio, cdata) 2050 2051 # OpenSSH + OpenSSH 2052 if format is serialization.PublicFormat.OpenSSH: 2053 if encoding is serialization.Encoding.OpenSSH: 2054 return ssh.serialize_ssh_public_key(key) 2055 2056 raise ValueError( 2057 "OpenSSH format must be used with OpenSSH encoding" 2058 ) 2059 2060 # Anything that key-specific code was supposed to handle earlier, 2061 # like Raw, CompressedPoint, UncompressedPoint 2062 raise ValueError("format is invalid with this key") 2063 2064 def _parameter_bytes(self, encoding, format, cdata): 2065 if encoding is serialization.Encoding.OpenSSH: 2066 raise TypeError("OpenSSH encoding is not supported") 2067 2068 # Only DH is supported here currently. 2069 q = self._ffi.new("BIGNUM **") 2070 self._lib.DH_get0_pqg(cdata, self._ffi.NULL, q, self._ffi.NULL) 2071 if encoding is serialization.Encoding.PEM: 2072 if q[0] != self._ffi.NULL: 2073 write_bio = self._lib.PEM_write_bio_DHxparams 2074 else: 2075 write_bio = self._lib.PEM_write_bio_DHparams 2076 elif encoding is serialization.Encoding.DER: 2077 if q[0] != self._ffi.NULL: 2078 write_bio = self._lib.Cryptography_i2d_DHxparams_bio 2079 else: 2080 write_bio = self._lib.i2d_DHparams_bio 2081 else: 2082 raise TypeError("encoding must be an item from the Encoding enum") 2083 2084 bio = self._create_mem_bio_gc() 2085 res = write_bio(bio, cdata) 2086 self.openssl_assert(res == 1) 2087 return self._read_mem_bio(bio) 2088 2089 def generate_dh_parameters(self, generator, key_size): 2090 if key_size < dh._MIN_MODULUS_SIZE: 2091 raise ValueError( 2092 "DH key_size must be at least {} bits".format( 2093 dh._MIN_MODULUS_SIZE 2094 ) 2095 ) 2096 2097 if generator not in (2, 5): 2098 raise ValueError("DH generator must be 2 or 5") 2099 2100 dh_param_cdata = self._lib.DH_new() 2101 self.openssl_assert(dh_param_cdata != self._ffi.NULL) 2102 dh_param_cdata = self._ffi.gc(dh_param_cdata, self._lib.DH_free) 2103 2104 res = self._lib.DH_generate_parameters_ex( 2105 dh_param_cdata, key_size, generator, self._ffi.NULL 2106 ) 2107 self.openssl_assert(res == 1) 2108 2109 return _DHParameters(self, dh_param_cdata) 2110 2111 def _dh_cdata_to_evp_pkey(self, dh_cdata): 2112 evp_pkey = self._create_evp_pkey_gc() 2113 res = self._lib.EVP_PKEY_set1_DH(evp_pkey, dh_cdata) 2114 self.openssl_assert(res == 1) 2115 return evp_pkey 2116 2117 def generate_dh_private_key(self, parameters): 2118 dh_key_cdata = _dh_params_dup(parameters._dh_cdata, self) 2119 2120 res = self._lib.DH_generate_key(dh_key_cdata) 2121 self.openssl_assert(res == 1) 2122 2123 evp_pkey = self._dh_cdata_to_evp_pkey(dh_key_cdata) 2124 2125 return _DHPrivateKey(self, dh_key_cdata, evp_pkey) 2126 2127 def generate_dh_private_key_and_parameters(self, generator, key_size): 2128 return self.generate_dh_private_key( 2129 self.generate_dh_parameters(generator, key_size) 2130 ) 2131 2132 def load_dh_private_numbers(self, numbers): 2133 parameter_numbers = numbers.public_numbers.parameter_numbers 2134 2135 dh_cdata = self._lib.DH_new() 2136 self.openssl_assert(dh_cdata != self._ffi.NULL) 2137 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 2138 2139 p = self._int_to_bn(parameter_numbers.p) 2140 g = self._int_to_bn(parameter_numbers.g) 2141 2142 if parameter_numbers.q is not None: 2143 q = self._int_to_bn(parameter_numbers.q) 2144 else: 2145 q = self._ffi.NULL 2146 2147 pub_key = self._int_to_bn(numbers.public_numbers.y) 2148 priv_key = self._int_to_bn(numbers.x) 2149 2150 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g) 2151 self.openssl_assert(res == 1) 2152 2153 res = self._lib.DH_set0_key(dh_cdata, pub_key, priv_key) 2154 self.openssl_assert(res == 1) 2155 2156 codes = self._ffi.new("int[]", 1) 2157 res = self._lib.Cryptography_DH_check(dh_cdata, codes) 2158 self.openssl_assert(res == 1) 2159 2160 # DH_check will return DH_NOT_SUITABLE_GENERATOR if p % 24 does not 2161 # equal 11 when the generator is 2 (a quadratic nonresidue). 2162 # We want to ignore that error because p % 24 == 23 is also fine. 2163 # Specifically, g is then a quadratic residue. Within the context of 2164 # Diffie-Hellman this means it can only generate half the possible 2165 # values. That sounds bad, but quadratic nonresidues leak a bit of 2166 # the key to the attacker in exchange for having the full key space 2167 # available. See: https://crypto.stackexchange.com/questions/12961 2168 if codes[0] != 0 and not ( 2169 parameter_numbers.g == 2 2170 and codes[0] ^ self._lib.DH_NOT_SUITABLE_GENERATOR == 0 2171 ): 2172 raise ValueError("DH private numbers did not pass safety checks.") 2173 2174 evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata) 2175 2176 return _DHPrivateKey(self, dh_cdata, evp_pkey) 2177 2178 def load_dh_public_numbers(self, numbers): 2179 dh_cdata = self._lib.DH_new() 2180 self.openssl_assert(dh_cdata != self._ffi.NULL) 2181 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 2182 2183 parameter_numbers = numbers.parameter_numbers 2184 2185 p = self._int_to_bn(parameter_numbers.p) 2186 g = self._int_to_bn(parameter_numbers.g) 2187 2188 if parameter_numbers.q is not None: 2189 q = self._int_to_bn(parameter_numbers.q) 2190 else: 2191 q = self._ffi.NULL 2192 2193 pub_key = self._int_to_bn(numbers.y) 2194 2195 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g) 2196 self.openssl_assert(res == 1) 2197 2198 res = self._lib.DH_set0_key(dh_cdata, pub_key, self._ffi.NULL) 2199 self.openssl_assert(res == 1) 2200 2201 evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata) 2202 2203 return _DHPublicKey(self, dh_cdata, evp_pkey) 2204 2205 def load_dh_parameter_numbers(self, numbers): 2206 dh_cdata = self._lib.DH_new() 2207 self.openssl_assert(dh_cdata != self._ffi.NULL) 2208 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 2209 2210 p = self._int_to_bn(numbers.p) 2211 g = self._int_to_bn(numbers.g) 2212 2213 if numbers.q is not None: 2214 q = self._int_to_bn(numbers.q) 2215 else: 2216 q = self._ffi.NULL 2217 2218 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g) 2219 self.openssl_assert(res == 1) 2220 2221 return _DHParameters(self, dh_cdata) 2222 2223 def dh_parameters_supported(self, p, g, q=None): 2224 dh_cdata = self._lib.DH_new() 2225 self.openssl_assert(dh_cdata != self._ffi.NULL) 2226 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 2227 2228 p = self._int_to_bn(p) 2229 g = self._int_to_bn(g) 2230 2231 if q is not None: 2232 q = self._int_to_bn(q) 2233 else: 2234 q = self._ffi.NULL 2235 2236 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g) 2237 self.openssl_assert(res == 1) 2238 2239 codes = self._ffi.new("int[]", 1) 2240 res = self._lib.Cryptography_DH_check(dh_cdata, codes) 2241 self.openssl_assert(res == 1) 2242 2243 return codes[0] == 0 2244 2245 def dh_x942_serialization_supported(self): 2246 return self._lib.Cryptography_HAS_EVP_PKEY_DHX == 1 2247 2248 def x509_name_bytes(self, name): 2249 x509_name = _encode_name_gc(self, name) 2250 pp = self._ffi.new("unsigned char **") 2251 res = self._lib.i2d_X509_NAME(x509_name, pp) 2252 self.openssl_assert(pp[0] != self._ffi.NULL) 2253 pp = self._ffi.gc( 2254 pp, lambda pointer: self._lib.OPENSSL_free(pointer[0]) 2255 ) 2256 self.openssl_assert(res > 0) 2257 return self._ffi.buffer(pp[0], res)[:] 2258 2259 def x25519_load_public_bytes(self, data): 2260 # When we drop support for CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 we can 2261 # switch this to EVP_PKEY_new_raw_public_key 2262 if len(data) != 32: 2263 raise ValueError("An X25519 public key is 32 bytes long") 2264 2265 evp_pkey = self._create_evp_pkey_gc() 2266 res = self._lib.EVP_PKEY_set_type(evp_pkey, self._lib.NID_X25519) 2267 self.openssl_assert(res == 1) 2268 res = self._lib.EVP_PKEY_set1_tls_encodedpoint( 2269 evp_pkey, data, len(data) 2270 ) 2271 self.openssl_assert(res == 1) 2272 return _X25519PublicKey(self, evp_pkey) 2273 2274 def x25519_load_private_bytes(self, data): 2275 # When we drop support for CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 we can 2276 # switch this to EVP_PKEY_new_raw_private_key and drop the 2277 # zeroed_bytearray garbage. 2278 # OpenSSL only has facilities for loading PKCS8 formatted private 2279 # keys using the algorithm identifiers specified in 2280 # https://tools.ietf.org/html/draft-ietf-curdle-pkix-09. 2281 # This is the standard PKCS8 prefix for a 32 byte X25519 key. 2282 # The form is: 2283 # 0:d=0 hl=2 l= 46 cons: SEQUENCE 2284 # 2:d=1 hl=2 l= 1 prim: INTEGER :00 2285 # 5:d=1 hl=2 l= 5 cons: SEQUENCE 2286 # 7:d=2 hl=2 l= 3 prim: OBJECT :1.3.101.110 2287 # 12:d=1 hl=2 l= 34 prim: OCTET STRING (the key) 2288 # Of course there's a bit more complexity. In reality OCTET STRING 2289 # contains an OCTET STRING of length 32! So the last two bytes here 2290 # are \x04\x20, which is an OCTET STRING of length 32. 2291 if len(data) != 32: 2292 raise ValueError("An X25519 private key is 32 bytes long") 2293 2294 pkcs8_prefix = b'0.\x02\x01\x000\x05\x06\x03+en\x04"\x04 ' 2295 with self._zeroed_bytearray(48) as ba: 2296 ba[0:16] = pkcs8_prefix 2297 ba[16:] = data 2298 bio = self._bytes_to_bio(ba) 2299 evp_pkey = self._lib.d2i_PrivateKey_bio(bio.bio, self._ffi.NULL) 2300 2301 self.openssl_assert(evp_pkey != self._ffi.NULL) 2302 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 2303 self.openssl_assert( 2304 self._lib.EVP_PKEY_id(evp_pkey) == self._lib.EVP_PKEY_X25519 2305 ) 2306 return _X25519PrivateKey(self, evp_pkey) 2307 2308 def _evp_pkey_keygen_gc(self, nid): 2309 evp_pkey_ctx = self._lib.EVP_PKEY_CTX_new_id(nid, self._ffi.NULL) 2310 self.openssl_assert(evp_pkey_ctx != self._ffi.NULL) 2311 evp_pkey_ctx = self._ffi.gc(evp_pkey_ctx, self._lib.EVP_PKEY_CTX_free) 2312 res = self._lib.EVP_PKEY_keygen_init(evp_pkey_ctx) 2313 self.openssl_assert(res == 1) 2314 evp_ppkey = self._ffi.new("EVP_PKEY **") 2315 res = self._lib.EVP_PKEY_keygen(evp_pkey_ctx, evp_ppkey) 2316 self.openssl_assert(res == 1) 2317 self.openssl_assert(evp_ppkey[0] != self._ffi.NULL) 2318 evp_pkey = self._ffi.gc(evp_ppkey[0], self._lib.EVP_PKEY_free) 2319 return evp_pkey 2320 2321 def x25519_generate_key(self): 2322 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X25519) 2323 return _X25519PrivateKey(self, evp_pkey) 2324 2325 def x25519_supported(self): 2326 if self._fips_enabled: 2327 return False 2328 return not self._lib.CRYPTOGRAPHY_IS_LIBRESSL 2329 2330 def x448_load_public_bytes(self, data): 2331 if len(data) != 56: 2332 raise ValueError("An X448 public key is 56 bytes long") 2333 2334 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key( 2335 self._lib.NID_X448, self._ffi.NULL, data, len(data) 2336 ) 2337 self.openssl_assert(evp_pkey != self._ffi.NULL) 2338 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 2339 return _X448PublicKey(self, evp_pkey) 2340 2341 def x448_load_private_bytes(self, data): 2342 if len(data) != 56: 2343 raise ValueError("An X448 private key is 56 bytes long") 2344 2345 data_ptr = self._ffi.from_buffer(data) 2346 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key( 2347 self._lib.NID_X448, self._ffi.NULL, data_ptr, len(data) 2348 ) 2349 self.openssl_assert(evp_pkey != self._ffi.NULL) 2350 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 2351 return _X448PrivateKey(self, evp_pkey) 2352 2353 def x448_generate_key(self): 2354 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X448) 2355 return _X448PrivateKey(self, evp_pkey) 2356 2357 def x448_supported(self): 2358 if self._fips_enabled: 2359 return False 2360 return not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 2361 2362 def ed25519_supported(self): 2363 if self._fips_enabled: 2364 return False 2365 return not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111B 2366 2367 def ed25519_load_public_bytes(self, data): 2368 utils._check_bytes("data", data) 2369 2370 if len(data) != ed25519._ED25519_KEY_SIZE: 2371 raise ValueError("An Ed25519 public key is 32 bytes long") 2372 2373 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key( 2374 self._lib.NID_ED25519, self._ffi.NULL, data, len(data) 2375 ) 2376 self.openssl_assert(evp_pkey != self._ffi.NULL) 2377 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 2378 2379 return _Ed25519PublicKey(self, evp_pkey) 2380 2381 def ed25519_load_private_bytes(self, data): 2382 if len(data) != ed25519._ED25519_KEY_SIZE: 2383 raise ValueError("An Ed25519 private key is 32 bytes long") 2384 2385 utils._check_byteslike("data", data) 2386 data_ptr = self._ffi.from_buffer(data) 2387 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key( 2388 self._lib.NID_ED25519, self._ffi.NULL, data_ptr, len(data) 2389 ) 2390 self.openssl_assert(evp_pkey != self._ffi.NULL) 2391 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 2392 2393 return _Ed25519PrivateKey(self, evp_pkey) 2394 2395 def ed25519_generate_key(self): 2396 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED25519) 2397 return _Ed25519PrivateKey(self, evp_pkey) 2398 2399 def ed448_supported(self): 2400 if self._fips_enabled: 2401 return False 2402 return not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111B 2403 2404 def ed448_load_public_bytes(self, data): 2405 utils._check_bytes("data", data) 2406 if len(data) != _ED448_KEY_SIZE: 2407 raise ValueError("An Ed448 public key is 57 bytes long") 2408 2409 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key( 2410 self._lib.NID_ED448, self._ffi.NULL, data, len(data) 2411 ) 2412 self.openssl_assert(evp_pkey != self._ffi.NULL) 2413 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 2414 2415 return _Ed448PublicKey(self, evp_pkey) 2416 2417 def ed448_load_private_bytes(self, data): 2418 utils._check_byteslike("data", data) 2419 if len(data) != _ED448_KEY_SIZE: 2420 raise ValueError("An Ed448 private key is 57 bytes long") 2421 2422 data_ptr = self._ffi.from_buffer(data) 2423 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key( 2424 self._lib.NID_ED448, self._ffi.NULL, data_ptr, len(data) 2425 ) 2426 self.openssl_assert(evp_pkey != self._ffi.NULL) 2427 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 2428 2429 return _Ed448PrivateKey(self, evp_pkey) 2430 2431 def ed448_generate_key(self): 2432 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED448) 2433 return _Ed448PrivateKey(self, evp_pkey) 2434 2435 def derive_scrypt(self, key_material, salt, length, n, r, p): 2436 buf = self._ffi.new("unsigned char[]", length) 2437 key_material_ptr = self._ffi.from_buffer(key_material) 2438 res = self._lib.EVP_PBE_scrypt( 2439 key_material_ptr, 2440 len(key_material), 2441 salt, 2442 len(salt), 2443 n, 2444 r, 2445 p, 2446 scrypt._MEM_LIMIT, 2447 buf, 2448 length, 2449 ) 2450 if res != 1: 2451 errors = self._consume_errors_with_text() 2452 # memory required formula explained here: 2453 # https://blog.filippo.io/the-scrypt-parameters/ 2454 min_memory = 128 * n * r // (1024 ** 2) 2455 raise MemoryError( 2456 "Not enough memory to derive key. These parameters require" 2457 " {} MB of memory.".format(min_memory), 2458 errors, 2459 ) 2460 return self._ffi.buffer(buf)[:] 2461 2462 def aead_cipher_supported(self, cipher): 2463 cipher_name = aead._aead_cipher_name(cipher) 2464 if self._fips_enabled and cipher_name not in self._fips_aead: 2465 return False 2466 return self._lib.EVP_get_cipherbyname(cipher_name) != self._ffi.NULL 2467 2468 @contextlib.contextmanager 2469 def _zeroed_bytearray(self, length): 2470 """ 2471 This method creates a bytearray, which we copy data into (hopefully 2472 also from a mutable buffer that can be dynamically erased!), and then 2473 zero when we're done. 2474 """ 2475 ba = bytearray(length) 2476 try: 2477 yield ba 2478 finally: 2479 self._zero_data(ba, length) 2480 2481 def _zero_data(self, data, length): 2482 # We clear things this way because at the moment we're not 2483 # sure of a better way that can guarantee it overwrites the 2484 # memory of a bytearray and doesn't just replace the underlying char *. 2485 for i in range(length): 2486 data[i] = 0 2487 2488 @contextlib.contextmanager 2489 def _zeroed_null_terminated_buf(self, data): 2490 """ 2491 This method takes bytes, which can be a bytestring or a mutable 2492 buffer like a bytearray, and yields a null-terminated version of that 2493 data. This is required because PKCS12_parse doesn't take a length with 2494 its password char * and ffi.from_buffer doesn't provide null 2495 termination. So, to support zeroing the data via bytearray we 2496 need to build this ridiculous construct that copies the memory, but 2497 zeroes it after use. 2498 """ 2499 if data is None: 2500 yield self._ffi.NULL 2501 else: 2502 data_len = len(data) 2503 buf = self._ffi.new("char[]", data_len + 1) 2504 self._ffi.memmove(buf, data, data_len) 2505 try: 2506 yield buf 2507 finally: 2508 # Cast to a uint8_t * so we can assign by integer 2509 self._zero_data(self._ffi.cast("uint8_t *", buf), data_len) 2510 2511 def load_key_and_certificates_from_pkcs12(self, data, password): 2512 if password is not None: 2513 utils._check_byteslike("password", password) 2514 2515 bio = self._bytes_to_bio(data) 2516 p12 = self._lib.d2i_PKCS12_bio(bio.bio, self._ffi.NULL) 2517 if p12 == self._ffi.NULL: 2518 self._consume_errors() 2519 raise ValueError("Could not deserialize PKCS12 data") 2520 2521 p12 = self._ffi.gc(p12, self._lib.PKCS12_free) 2522 evp_pkey_ptr = self._ffi.new("EVP_PKEY **") 2523 x509_ptr = self._ffi.new("X509 **") 2524 sk_x509_ptr = self._ffi.new("Cryptography_STACK_OF_X509 **") 2525 with self._zeroed_null_terminated_buf(password) as password_buf: 2526 res = self._lib.PKCS12_parse( 2527 p12, password_buf, evp_pkey_ptr, x509_ptr, sk_x509_ptr 2528 ) 2529 2530 if res == 0: 2531 self._consume_errors() 2532 raise ValueError("Invalid password or PKCS12 data") 2533 2534 cert = None 2535 key = None 2536 additional_certificates = [] 2537 2538 if evp_pkey_ptr[0] != self._ffi.NULL: 2539 evp_pkey = self._ffi.gc(evp_pkey_ptr[0], self._lib.EVP_PKEY_free) 2540 key = self._evp_pkey_to_private_key(evp_pkey) 2541 2542 if x509_ptr[0] != self._ffi.NULL: 2543 x509 = self._ffi.gc(x509_ptr[0], self._lib.X509_free) 2544 cert = _Certificate(self, x509) 2545 2546 if sk_x509_ptr[0] != self._ffi.NULL: 2547 sk_x509 = self._ffi.gc(sk_x509_ptr[0], self._lib.sk_X509_free) 2548 num = self._lib.sk_X509_num(sk_x509_ptr[0]) 2549 for i in range(num): 2550 x509 = self._lib.sk_X509_value(sk_x509, i) 2551 self.openssl_assert(x509 != self._ffi.NULL) 2552 x509 = self._ffi.gc(x509, self._lib.X509_free) 2553 additional_certificates.append(_Certificate(self, x509)) 2554 2555 return (key, cert, additional_certificates) 2556 2557 def serialize_key_and_certificates_to_pkcs12( 2558 self, name, key, cert, cas, encryption_algorithm 2559 ): 2560 password = None 2561 if name is not None: 2562 utils._check_bytes("name", name) 2563 2564 if isinstance(encryption_algorithm, serialization.NoEncryption): 2565 nid_cert = -1 2566 nid_key = -1 2567 pkcs12_iter = 0 2568 mac_iter = 0 2569 elif isinstance( 2570 encryption_algorithm, serialization.BestAvailableEncryption 2571 ): 2572 # PKCS12 encryption is hopeless trash and can never be fixed. 2573 # This is the least terrible option. 2574 nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 2575 nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC 2576 # At least we can set this higher than OpenSSL's default 2577 pkcs12_iter = 20000 2578 # mac_iter chosen for compatibility reasons, see: 2579 # https://www.openssl.org/docs/man1.1.1/man3/PKCS12_create.html 2580 # Did we mention how lousy PKCS12 encryption is? 2581 mac_iter = 1 2582 password = encryption_algorithm.password 2583 else: 2584 raise ValueError("Unsupported key encryption type") 2585 2586 if cas is None or len(cas) == 0: 2587 sk_x509 = self._ffi.NULL 2588 else: 2589 sk_x509 = self._lib.sk_X509_new_null() 2590 sk_x509 = self._ffi.gc(sk_x509, self._lib.sk_X509_free) 2591 2592 # reverse the list when building the stack so that they're encoded 2593 # in the order they were originally provided. it is a mystery 2594 for ca in reversed(cas): 2595 res = self._lib.sk_X509_push(sk_x509, ca._x509) 2596 backend.openssl_assert(res >= 1) 2597 2598 with self._zeroed_null_terminated_buf(password) as password_buf: 2599 with self._zeroed_null_terminated_buf(name) as name_buf: 2600 p12 = self._lib.PKCS12_create( 2601 password_buf, 2602 name_buf, 2603 key._evp_pkey if key else self._ffi.NULL, 2604 cert._x509 if cert else self._ffi.NULL, 2605 sk_x509, 2606 nid_key, 2607 nid_cert, 2608 pkcs12_iter, 2609 mac_iter, 2610 0, 2611 ) 2612 2613 self.openssl_assert(p12 != self._ffi.NULL) 2614 p12 = self._ffi.gc(p12, self._lib.PKCS12_free) 2615 2616 bio = self._create_mem_bio_gc() 2617 res = self._lib.i2d_PKCS12_bio(bio, p12) 2618 self.openssl_assert(res > 0) 2619 return self._read_mem_bio(bio) 2620 2621 def poly1305_supported(self): 2622 if self._fips_enabled: 2623 return False 2624 return self._lib.Cryptography_HAS_POLY1305 == 1 2625 2626 def create_poly1305_ctx(self, key): 2627 utils._check_byteslike("key", key) 2628 if len(key) != _POLY1305_KEY_SIZE: 2629 raise ValueError("A poly1305 key is 32 bytes long") 2630 2631 return _Poly1305Context(self, key) 2632 2633 def load_pem_pkcs7_certificates(self, data): 2634 utils._check_bytes("data", data) 2635 bio = self._bytes_to_bio(data) 2636 p7 = self._lib.PEM_read_bio_PKCS7( 2637 bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 2638 ) 2639 if p7 == self._ffi.NULL: 2640 self._consume_errors() 2641 raise ValueError("Unable to parse PKCS7 data") 2642 2643 p7 = self._ffi.gc(p7, self._lib.PKCS7_free) 2644 return self._load_pkcs7_certificates(p7) 2645 2646 def load_der_pkcs7_certificates(self, data): 2647 utils._check_bytes("data", data) 2648 bio = self._bytes_to_bio(data) 2649 p7 = self._lib.d2i_PKCS7_bio(bio.bio, self._ffi.NULL) 2650 if p7 == self._ffi.NULL: 2651 self._consume_errors() 2652 raise ValueError("Unable to parse PKCS7 data") 2653 2654 p7 = self._ffi.gc(p7, self._lib.PKCS7_free) 2655 return self._load_pkcs7_certificates(p7) 2656 2657 def _load_pkcs7_certificates(self, p7): 2658 nid = self._lib.OBJ_obj2nid(p7.type) 2659 self.openssl_assert(nid != self._lib.NID_undef) 2660 if nid != self._lib.NID_pkcs7_signed: 2661 raise UnsupportedAlgorithm( 2662 "Only basic signed structures are currently supported. NID" 2663 " for this data was {}".format(nid), 2664 _Reasons.UNSUPPORTED_SERIALIZATION, 2665 ) 2666 2667 sk_x509 = p7.d.sign.cert 2668 num = self._lib.sk_X509_num(sk_x509) 2669 certs = [] 2670 for i in range(num): 2671 x509 = self._lib.sk_X509_value(sk_x509, i) 2672 self.openssl_assert(x509 != self._ffi.NULL) 2673 res = self._lib.X509_up_ref(x509) 2674 # When OpenSSL is less than 1.1.0 up_ref returns the current 2675 # refcount. On 1.1.0+ it returns 1 for success. 2676 self.openssl_assert(res >= 1) 2677 x509 = self._ffi.gc(x509, self._lib.X509_free) 2678 certs.append(_Certificate(self, x509)) 2679 2680 return certs 2681 2682 def pkcs7_sign(self, builder, encoding, options): 2683 bio = self._bytes_to_bio(builder._data) 2684 init_flags = self._lib.PKCS7_PARTIAL 2685 final_flags = 0 2686 2687 if len(builder._additional_certs) == 0: 2688 certs = self._ffi.NULL 2689 else: 2690 certs = self._lib.sk_X509_new_null() 2691 certs = self._ffi.gc(certs, self._lib.sk_X509_free) 2692 for cert in builder._additional_certs: 2693 res = self._lib.sk_X509_push(certs, cert._x509) 2694 self.openssl_assert(res >= 1) 2695 2696 if pkcs7.PKCS7Options.DetachedSignature in options: 2697 # Don't embed the data in the PKCS7 structure 2698 init_flags |= self._lib.PKCS7_DETACHED 2699 final_flags |= self._lib.PKCS7_DETACHED 2700 2701 # This just inits a structure for us. However, there 2702 # are flags we need to set, joy. 2703 p7 = self._lib.PKCS7_sign( 2704 self._ffi.NULL, 2705 self._ffi.NULL, 2706 certs, 2707 self._ffi.NULL, 2708 init_flags, 2709 ) 2710 self.openssl_assert(p7 != self._ffi.NULL) 2711 p7 = self._ffi.gc(p7, self._lib.PKCS7_free) 2712 signer_flags = 0 2713 # These flags are configurable on a per-signature basis 2714 # but we've deliberately chosen to make the API only allow 2715 # setting it across all signatures for now. 2716 if pkcs7.PKCS7Options.NoCapabilities in options: 2717 signer_flags |= self._lib.PKCS7_NOSMIMECAP 2718 elif pkcs7.PKCS7Options.NoAttributes in options: 2719 signer_flags |= self._lib.PKCS7_NOATTR 2720 2721 if pkcs7.PKCS7Options.NoCerts in options: 2722 signer_flags |= self._lib.PKCS7_NOCERTS 2723 2724 for certificate, private_key, hash_algorithm in builder._signers: 2725 md = self._evp_md_non_null_from_algorithm(hash_algorithm) 2726 p7signerinfo = self._lib.PKCS7_sign_add_signer( 2727 p7, certificate._x509, private_key._evp_pkey, md, signer_flags 2728 ) 2729 self.openssl_assert(p7signerinfo != self._ffi.NULL) 2730 2731 for option in options: 2732 # DetachedSignature, NoCapabilities, and NoAttributes are already 2733 # handled so we just need to check these last two options. 2734 if option is pkcs7.PKCS7Options.Text: 2735 final_flags |= self._lib.PKCS7_TEXT 2736 elif option is pkcs7.PKCS7Options.Binary: 2737 final_flags |= self._lib.PKCS7_BINARY 2738 2739 bio_out = self._create_mem_bio_gc() 2740 if encoding is serialization.Encoding.SMIME: 2741 # This finalizes the structure 2742 res = self._lib.SMIME_write_PKCS7( 2743 bio_out, p7, bio.bio, final_flags 2744 ) 2745 elif encoding is serialization.Encoding.PEM: 2746 res = self._lib.PKCS7_final(p7, bio.bio, final_flags) 2747 self.openssl_assert(res == 1) 2748 res = self._lib.PEM_write_bio_PKCS7_stream( 2749 bio_out, p7, bio.bio, final_flags 2750 ) 2751 else: 2752 assert encoding is serialization.Encoding.DER 2753 # We need to call finalize here becauase i2d_PKCS7_bio does not 2754 # finalize. 2755 res = self._lib.PKCS7_final(p7, bio.bio, final_flags) 2756 self.openssl_assert(res == 1) 2757 res = self._lib.i2d_PKCS7_bio(bio_out, p7) 2758 self.openssl_assert(res == 1) 2759 return self._read_mem_bio(bio_out) 2760 2761 2762class GetCipherByName(object): 2763 def __init__(self, fmt): 2764 self._fmt = fmt 2765 2766 def __call__(self, backend, cipher, mode): 2767 cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower() 2768 return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii")) 2769 2770 2771def _get_xts_cipher(backend, cipher, mode): 2772 cipher_name = "aes-{}-xts".format(cipher.key_size // 2) 2773 return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii")) 2774 2775 2776backend = Backend() 2777