1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7import base64 8import collections 9import contextlib 10import itertools 11from contextlib import contextmanager 12 13import asn1crypto.core 14 15import six 16from six.moves import range 17 18from cryptography import utils, x509 19from cryptography.exceptions import UnsupportedAlgorithm, _Reasons 20from cryptography.hazmat.backends.interfaces import ( 21 CMACBackend, CipherBackend, DERSerializationBackend, DHBackend, DSABackend, 22 EllipticCurveBackend, HMACBackend, HashBackend, PBKDF2HMACBackend, 23 PEMSerializationBackend, RSABackend, ScryptBackend, X509Backend 24) 25from cryptography.hazmat.backends.openssl import aead 26from cryptography.hazmat.backends.openssl.ciphers import _CipherContext 27from cryptography.hazmat.backends.openssl.cmac import _CMACContext 28from cryptography.hazmat.backends.openssl.decode_asn1 import ( 29 _CRL_ENTRY_REASON_ENUM_TO_CODE, _Integers 30) 31from cryptography.hazmat.backends.openssl.dh import ( 32 _DHParameters, _DHPrivateKey, _DHPublicKey, _dh_params_dup 33) 34from cryptography.hazmat.backends.openssl.dsa import ( 35 _DSAParameters, _DSAPrivateKey, _DSAPublicKey 36) 37from cryptography.hazmat.backends.openssl.ec import ( 38 _EllipticCurvePrivateKey, _EllipticCurvePublicKey 39) 40from cryptography.hazmat.backends.openssl.encode_asn1 import ( 41 _CRL_ENTRY_EXTENSION_ENCODE_HANDLERS, 42 _CRL_EXTENSION_ENCODE_HANDLERS, _EXTENSION_ENCODE_HANDLERS, 43 _OCSP_BASICRESP_EXTENSION_ENCODE_HANDLERS, 44 _OCSP_REQUEST_EXTENSION_ENCODE_HANDLERS, 45 _encode_asn1_int_gc, _encode_asn1_str_gc, _encode_name_gc, _txt2obj_gc, 46) 47from cryptography.hazmat.backends.openssl.hashes import _HashContext 48from cryptography.hazmat.backends.openssl.hmac import _HMACContext 49from cryptography.hazmat.backends.openssl.ocsp import ( 50 _OCSPRequest, _OCSPResponse 51) 52from cryptography.hazmat.backends.openssl.rsa import ( 53 _RSAPrivateKey, _RSAPublicKey 54) 55from cryptography.hazmat.backends.openssl.x25519 import ( 56 _X25519PrivateKey, _X25519PublicKey 57) 58from cryptography.hazmat.backends.openssl.x448 import ( 59 _X448PrivateKey, _X448PublicKey 60) 61from cryptography.hazmat.backends.openssl.x509 import ( 62 _Certificate, _CertificateRevocationList, 63 _CertificateSigningRequest, _RevokedCertificate 64) 65from cryptography.hazmat.bindings.openssl import binding 66from cryptography.hazmat.primitives import hashes, serialization 67from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa 68from cryptography.hazmat.primitives.asymmetric.padding import ( 69 MGF1, OAEP, PKCS1v15, PSS 70) 71from cryptography.hazmat.primitives.ciphers.algorithms import ( 72 AES, ARC4, Blowfish, CAST5, Camellia, ChaCha20, IDEA, SEED, TripleDES 73) 74from cryptography.hazmat.primitives.ciphers.modes import ( 75 CBC, CFB, CFB8, CTR, ECB, GCM, OFB, XTS 76) 77from cryptography.hazmat.primitives.kdf import scrypt 78from cryptography.hazmat.primitives.serialization import ssh 79from cryptography.x509 import ocsp 80 81 82_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"]) 83 84 85@utils.register_interface(CipherBackend) 86@utils.register_interface(CMACBackend) 87@utils.register_interface(DERSerializationBackend) 88@utils.register_interface(DHBackend) 89@utils.register_interface(DSABackend) 90@utils.register_interface(EllipticCurveBackend) 91@utils.register_interface(HashBackend) 92@utils.register_interface(HMACBackend) 93@utils.register_interface(PBKDF2HMACBackend) 94@utils.register_interface(RSABackend) 95@utils.register_interface(PEMSerializationBackend) 96@utils.register_interface(X509Backend) 97@utils.register_interface_if( 98 binding.Binding().lib.Cryptography_HAS_SCRYPT, ScryptBackend 99) 100class Backend(object): 101 """ 102 OpenSSL API binding interfaces. 103 """ 104 name = "openssl" 105 106 def __init__(self): 107 self._binding = binding.Binding() 108 self._ffi = self._binding.ffi 109 self._lib = self._binding.lib 110 111 self._cipher_registry = {} 112 self._register_default_ciphers() 113 self.activate_osrandom_engine() 114 self._dh_types = [self._lib.EVP_PKEY_DH] 115 if self._lib.Cryptography_HAS_EVP_PKEY_DHX: 116 self._dh_types.append(self._lib.EVP_PKEY_DHX) 117 118 def openssl_assert(self, ok): 119 return binding._openssl_assert(self._lib, ok) 120 121 def activate_builtin_random(self): 122 # Obtain a new structural reference. 123 e = self._lib.ENGINE_get_default_RAND() 124 if e != self._ffi.NULL: 125 self._lib.ENGINE_unregister_RAND(e) 126 # Reset the RNG to use the new engine. 127 self._lib.RAND_cleanup() 128 # decrement the structural reference from get_default_RAND 129 res = self._lib.ENGINE_finish(e) 130 self.openssl_assert(res == 1) 131 132 @contextlib.contextmanager 133 def _get_osurandom_engine(self): 134 # Fetches an engine by id and returns it. This creates a structural 135 # reference. 136 e = self._lib.ENGINE_by_id(self._binding._osrandom_engine_id) 137 self.openssl_assert(e != self._ffi.NULL) 138 # Initialize the engine for use. This adds a functional reference. 139 res = self._lib.ENGINE_init(e) 140 self.openssl_assert(res == 1) 141 142 try: 143 yield e 144 finally: 145 # Decrement the structural ref incremented by ENGINE_by_id. 146 res = self._lib.ENGINE_free(e) 147 self.openssl_assert(res == 1) 148 # Decrement the functional ref incremented by ENGINE_init. 149 res = self._lib.ENGINE_finish(e) 150 self.openssl_assert(res == 1) 151 152 def activate_osrandom_engine(self): 153 # Unregister and free the current engine. 154 self.activate_builtin_random() 155 with self._get_osurandom_engine() as e: 156 # Set the engine as the default RAND provider. 157 res = self._lib.ENGINE_set_default_RAND(e) 158 self.openssl_assert(res == 1) 159 # Reset the RNG to use the new engine. 160 self._lib.RAND_cleanup() 161 162 def osrandom_engine_implementation(self): 163 buf = self._ffi.new("char[]", 64) 164 with self._get_osurandom_engine() as e: 165 res = self._lib.ENGINE_ctrl_cmd(e, b"get_implementation", 166 len(buf), buf, 167 self._ffi.NULL, 0) 168 self.openssl_assert(res > 0) 169 return self._ffi.string(buf).decode('ascii') 170 171 def openssl_version_text(self): 172 """ 173 Friendly string name of the loaded OpenSSL library. This is not 174 necessarily the same version as it was compiled against. 175 176 Example: OpenSSL 1.0.1e 11 Feb 2013 177 """ 178 return self._ffi.string( 179 self._lib.OpenSSL_version(self._lib.OPENSSL_VERSION) 180 ).decode("ascii") 181 182 def openssl_version_number(self): 183 return self._lib.OpenSSL_version_num() 184 185 def create_hmac_ctx(self, key, algorithm): 186 return _HMACContext(self, key, algorithm) 187 188 def _evp_md_from_algorithm(self, algorithm): 189 if algorithm.name == "blake2b" or algorithm.name == "blake2s": 190 alg = "{0}{1}".format( 191 algorithm.name, algorithm.digest_size * 8 192 ).encode("ascii") 193 else: 194 alg = algorithm.name.encode("ascii") 195 196 evp_md = self._lib.EVP_get_digestbyname(alg) 197 return evp_md 198 199 def _evp_md_non_null_from_algorithm(self, algorithm): 200 evp_md = self._evp_md_from_algorithm(algorithm) 201 self.openssl_assert(evp_md != self._ffi.NULL) 202 return evp_md 203 204 def hash_supported(self, algorithm): 205 evp_md = self._evp_md_from_algorithm(algorithm) 206 return evp_md != self._ffi.NULL 207 208 def hmac_supported(self, algorithm): 209 return self.hash_supported(algorithm) 210 211 def create_hash_ctx(self, algorithm): 212 return _HashContext(self, algorithm) 213 214 def cipher_supported(self, cipher, mode): 215 try: 216 adapter = self._cipher_registry[type(cipher), type(mode)] 217 except KeyError: 218 return False 219 evp_cipher = adapter(self, cipher, mode) 220 return self._ffi.NULL != evp_cipher 221 222 def register_cipher_adapter(self, cipher_cls, mode_cls, adapter): 223 if (cipher_cls, mode_cls) in self._cipher_registry: 224 raise ValueError("Duplicate registration for: {0} {1}.".format( 225 cipher_cls, mode_cls) 226 ) 227 self._cipher_registry[cipher_cls, mode_cls] = adapter 228 229 def _register_default_ciphers(self): 230 for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]: 231 self.register_cipher_adapter( 232 AES, 233 mode_cls, 234 GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}") 235 ) 236 for mode_cls in [CBC, CTR, ECB, OFB, CFB]: 237 self.register_cipher_adapter( 238 Camellia, 239 mode_cls, 240 GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}") 241 ) 242 for mode_cls in [CBC, CFB, CFB8, OFB]: 243 self.register_cipher_adapter( 244 TripleDES, 245 mode_cls, 246 GetCipherByName("des-ede3-{mode.name}") 247 ) 248 self.register_cipher_adapter( 249 TripleDES, 250 ECB, 251 GetCipherByName("des-ede3") 252 ) 253 for mode_cls in [CBC, CFB, OFB, ECB]: 254 self.register_cipher_adapter( 255 Blowfish, 256 mode_cls, 257 GetCipherByName("bf-{mode.name}") 258 ) 259 for mode_cls in [CBC, CFB, OFB, ECB]: 260 self.register_cipher_adapter( 261 SEED, 262 mode_cls, 263 GetCipherByName("seed-{mode.name}") 264 ) 265 for cipher_cls, mode_cls in itertools.product( 266 [CAST5, IDEA], 267 [CBC, OFB, CFB, ECB], 268 ): 269 self.register_cipher_adapter( 270 cipher_cls, 271 mode_cls, 272 GetCipherByName("{cipher.name}-{mode.name}") 273 ) 274 self.register_cipher_adapter( 275 ARC4, 276 type(None), 277 GetCipherByName("rc4") 278 ) 279 self.register_cipher_adapter( 280 ChaCha20, 281 type(None), 282 GetCipherByName("chacha20") 283 ) 284 self.register_cipher_adapter(AES, XTS, _get_xts_cipher) 285 286 def create_symmetric_encryption_ctx(self, cipher, mode): 287 return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT) 288 289 def create_symmetric_decryption_ctx(self, cipher, mode): 290 return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT) 291 292 def pbkdf2_hmac_supported(self, algorithm): 293 return self.hmac_supported(algorithm) 294 295 def derive_pbkdf2_hmac(self, algorithm, length, salt, iterations, 296 key_material): 297 buf = self._ffi.new("unsigned char[]", length) 298 evp_md = self._evp_md_non_null_from_algorithm(algorithm) 299 key_material_ptr = self._ffi.from_buffer(key_material) 300 res = self._lib.PKCS5_PBKDF2_HMAC( 301 key_material_ptr, 302 len(key_material), 303 salt, 304 len(salt), 305 iterations, 306 evp_md, 307 length, 308 buf 309 ) 310 self.openssl_assert(res == 1) 311 return self._ffi.buffer(buf)[:] 312 313 def _consume_errors(self): 314 return binding._consume_errors(self._lib) 315 316 def _bn_to_int(self, bn): 317 assert bn != self._ffi.NULL 318 319 if not six.PY2: 320 # Python 3 has constant time from_bytes, so use that. 321 bn_num_bytes = self._lib.BN_num_bytes(bn) 322 bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes) 323 bin_len = self._lib.BN_bn2bin(bn, bin_ptr) 324 # A zero length means the BN has value 0 325 self.openssl_assert(bin_len >= 0) 326 return int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big") 327 else: 328 # Under Python 2 the best we can do is hex() 329 hex_cdata = self._lib.BN_bn2hex(bn) 330 self.openssl_assert(hex_cdata != self._ffi.NULL) 331 hex_str = self._ffi.string(hex_cdata) 332 self._lib.OPENSSL_free(hex_cdata) 333 return int(hex_str, 16) 334 335 def _int_to_bn(self, num, bn=None): 336 """ 337 Converts a python integer to a BIGNUM. The returned BIGNUM will not 338 be garbage collected (to support adding them to structs that take 339 ownership of the object). Be sure to register it for GC if it will 340 be discarded after use. 341 """ 342 assert bn is None or bn != self._ffi.NULL 343 344 if bn is None: 345 bn = self._ffi.NULL 346 347 if not six.PY2: 348 # Python 3 has constant time to_bytes, so use that. 349 350 binary = num.to_bytes(int(num.bit_length() / 8.0 + 1), "big") 351 bn_ptr = self._lib.BN_bin2bn(binary, len(binary), bn) 352 self.openssl_assert(bn_ptr != self._ffi.NULL) 353 return bn_ptr 354 355 else: 356 # Under Python 2 the best we can do is hex(), [2:] removes the 0x 357 # prefix. 358 hex_num = hex(num).rstrip("L")[2:].encode("ascii") 359 bn_ptr = self._ffi.new("BIGNUM **") 360 bn_ptr[0] = bn 361 res = self._lib.BN_hex2bn(bn_ptr, hex_num) 362 self.openssl_assert(res != 0) 363 self.openssl_assert(bn_ptr[0] != self._ffi.NULL) 364 return bn_ptr[0] 365 366 def generate_rsa_private_key(self, public_exponent, key_size): 367 rsa._verify_rsa_parameters(public_exponent, key_size) 368 369 rsa_cdata = self._lib.RSA_new() 370 self.openssl_assert(rsa_cdata != self._ffi.NULL) 371 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 372 373 bn = self._int_to_bn(public_exponent) 374 bn = self._ffi.gc(bn, self._lib.BN_free) 375 376 res = self._lib.RSA_generate_key_ex( 377 rsa_cdata, key_size, bn, self._ffi.NULL 378 ) 379 self.openssl_assert(res == 1) 380 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 381 382 return _RSAPrivateKey(self, rsa_cdata, evp_pkey) 383 384 def generate_rsa_parameters_supported(self, public_exponent, key_size): 385 return (public_exponent >= 3 and public_exponent & 1 != 0 and 386 key_size >= 512) 387 388 def load_rsa_private_numbers(self, numbers): 389 rsa._check_private_key_components( 390 numbers.p, 391 numbers.q, 392 numbers.d, 393 numbers.dmp1, 394 numbers.dmq1, 395 numbers.iqmp, 396 numbers.public_numbers.e, 397 numbers.public_numbers.n 398 ) 399 rsa_cdata = self._lib.RSA_new() 400 self.openssl_assert(rsa_cdata != self._ffi.NULL) 401 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 402 p = self._int_to_bn(numbers.p) 403 q = self._int_to_bn(numbers.q) 404 d = self._int_to_bn(numbers.d) 405 dmp1 = self._int_to_bn(numbers.dmp1) 406 dmq1 = self._int_to_bn(numbers.dmq1) 407 iqmp = self._int_to_bn(numbers.iqmp) 408 e = self._int_to_bn(numbers.public_numbers.e) 409 n = self._int_to_bn(numbers.public_numbers.n) 410 res = self._lib.RSA_set0_factors(rsa_cdata, p, q) 411 self.openssl_assert(res == 1) 412 res = self._lib.RSA_set0_key(rsa_cdata, n, e, d) 413 self.openssl_assert(res == 1) 414 res = self._lib.RSA_set0_crt_params(rsa_cdata, dmp1, dmq1, iqmp) 415 self.openssl_assert(res == 1) 416 res = self._lib.RSA_blinding_on(rsa_cdata, self._ffi.NULL) 417 self.openssl_assert(res == 1) 418 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 419 420 return _RSAPrivateKey(self, rsa_cdata, evp_pkey) 421 422 def load_rsa_public_numbers(self, numbers): 423 rsa._check_public_key_components(numbers.e, numbers.n) 424 rsa_cdata = self._lib.RSA_new() 425 self.openssl_assert(rsa_cdata != self._ffi.NULL) 426 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 427 e = self._int_to_bn(numbers.e) 428 n = self._int_to_bn(numbers.n) 429 res = self._lib.RSA_set0_key(rsa_cdata, n, e, self._ffi.NULL) 430 self.openssl_assert(res == 1) 431 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 432 433 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 434 435 def _create_evp_pkey_gc(self): 436 evp_pkey = self._lib.EVP_PKEY_new() 437 self.openssl_assert(evp_pkey != self._ffi.NULL) 438 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 439 return evp_pkey 440 441 def _rsa_cdata_to_evp_pkey(self, rsa_cdata): 442 evp_pkey = self._create_evp_pkey_gc() 443 res = self._lib.EVP_PKEY_set1_RSA(evp_pkey, rsa_cdata) 444 self.openssl_assert(res == 1) 445 return evp_pkey 446 447 def _bytes_to_bio(self, data): 448 """ 449 Return a _MemoryBIO namedtuple of (BIO, char*). 450 451 The char* is the storage for the BIO and it must stay alive until the 452 BIO is finished with. 453 """ 454 data_ptr = self._ffi.from_buffer(data) 455 bio = self._lib.BIO_new_mem_buf( 456 data_ptr, len(data) 457 ) 458 self.openssl_assert(bio != self._ffi.NULL) 459 460 return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_ptr) 461 462 def _create_mem_bio_gc(self): 463 """ 464 Creates an empty memory BIO. 465 """ 466 bio_method = self._lib.BIO_s_mem() 467 self.openssl_assert(bio_method != self._ffi.NULL) 468 bio = self._lib.BIO_new(bio_method) 469 self.openssl_assert(bio != self._ffi.NULL) 470 bio = self._ffi.gc(bio, self._lib.BIO_free) 471 return bio 472 473 def _read_mem_bio(self, bio): 474 """ 475 Reads a memory BIO. This only works on memory BIOs. 476 """ 477 buf = self._ffi.new("char **") 478 buf_len = self._lib.BIO_get_mem_data(bio, buf) 479 self.openssl_assert(buf_len > 0) 480 self.openssl_assert(buf[0] != self._ffi.NULL) 481 bio_data = self._ffi.buffer(buf[0], buf_len)[:] 482 return bio_data 483 484 def _evp_pkey_to_private_key(self, evp_pkey): 485 """ 486 Return the appropriate type of PrivateKey given an evp_pkey cdata 487 pointer. 488 """ 489 490 key_type = self._lib.EVP_PKEY_id(evp_pkey) 491 492 if key_type == self._lib.EVP_PKEY_RSA: 493 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) 494 self.openssl_assert(rsa_cdata != self._ffi.NULL) 495 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 496 return _RSAPrivateKey(self, rsa_cdata, evp_pkey) 497 elif key_type == self._lib.EVP_PKEY_DSA: 498 dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey) 499 self.openssl_assert(dsa_cdata != self._ffi.NULL) 500 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 501 return _DSAPrivateKey(self, dsa_cdata, evp_pkey) 502 elif key_type == self._lib.EVP_PKEY_EC: 503 ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey) 504 self.openssl_assert(ec_cdata != self._ffi.NULL) 505 ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) 506 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 507 elif key_type in self._dh_types: 508 dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey) 509 self.openssl_assert(dh_cdata != self._ffi.NULL) 510 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 511 return _DHPrivateKey(self, dh_cdata, evp_pkey) 512 elif key_type == getattr(self._lib, "EVP_PKEY_X448", None): 513 # EVP_PKEY_X448 is not present in OpenSSL < 1.1.1 514 return _X448PrivateKey(self, evp_pkey) 515 elif key_type == getattr(self._lib, "EVP_PKEY_X25519", None): 516 # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.0 517 return _X25519PrivateKey(self, evp_pkey) 518 else: 519 raise UnsupportedAlgorithm("Unsupported key type.") 520 521 def _evp_pkey_to_public_key(self, evp_pkey): 522 """ 523 Return the appropriate type of PublicKey given an evp_pkey cdata 524 pointer. 525 """ 526 527 key_type = self._lib.EVP_PKEY_id(evp_pkey) 528 529 if key_type == self._lib.EVP_PKEY_RSA: 530 rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey) 531 self.openssl_assert(rsa_cdata != self._ffi.NULL) 532 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 533 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 534 elif key_type == self._lib.EVP_PKEY_DSA: 535 dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey) 536 self.openssl_assert(dsa_cdata != self._ffi.NULL) 537 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 538 return _DSAPublicKey(self, dsa_cdata, evp_pkey) 539 elif key_type == self._lib.EVP_PKEY_EC: 540 ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey) 541 self.openssl_assert(ec_cdata != self._ffi.NULL) 542 ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) 543 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey) 544 elif key_type in self._dh_types: 545 dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey) 546 self.openssl_assert(dh_cdata != self._ffi.NULL) 547 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 548 return _DHPublicKey(self, dh_cdata, evp_pkey) 549 elif key_type == getattr(self._lib, "EVP_PKEY_X448", None): 550 # EVP_PKEY_X448 is not present in OpenSSL < 1.1.1 551 return _X448PublicKey(self, evp_pkey) 552 elif key_type == getattr(self._lib, "EVP_PKEY_X25519", None): 553 # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.0 554 return _X25519PublicKey(self, evp_pkey) 555 else: 556 raise UnsupportedAlgorithm("Unsupported key type.") 557 558 def _oaep_hash_supported(self, algorithm): 559 if self._lib.Cryptography_HAS_RSA_OAEP_MD: 560 return isinstance( 561 algorithm, ( 562 hashes.SHA1, 563 hashes.SHA224, 564 hashes.SHA256, 565 hashes.SHA384, 566 hashes.SHA512, 567 ) 568 ) 569 else: 570 return isinstance(algorithm, hashes.SHA1) 571 572 def rsa_padding_supported(self, padding): 573 if isinstance(padding, PKCS1v15): 574 return True 575 elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1): 576 return self.hash_supported(padding._mgf._algorithm) 577 elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1): 578 return ( 579 self._oaep_hash_supported(padding._mgf._algorithm) and 580 self._oaep_hash_supported(padding._algorithm) and 581 ( 582 (padding._label is None or len(padding._label) == 0) or 583 self._lib.Cryptography_HAS_RSA_OAEP_LABEL == 1 584 ) 585 ) 586 else: 587 return False 588 589 def generate_dsa_parameters(self, key_size): 590 if key_size not in (1024, 2048, 3072): 591 raise ValueError("Key size must be 1024 or 2048 or 3072 bits.") 592 593 ctx = self._lib.DSA_new() 594 self.openssl_assert(ctx != self._ffi.NULL) 595 ctx = self._ffi.gc(ctx, self._lib.DSA_free) 596 597 res = self._lib.DSA_generate_parameters_ex( 598 ctx, key_size, self._ffi.NULL, 0, 599 self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 600 ) 601 602 self.openssl_assert(res == 1) 603 604 return _DSAParameters(self, ctx) 605 606 def generate_dsa_private_key(self, parameters): 607 ctx = self._lib.DSAparams_dup(parameters._dsa_cdata) 608 self.openssl_assert(ctx != self._ffi.NULL) 609 ctx = self._ffi.gc(ctx, self._lib.DSA_free) 610 self._lib.DSA_generate_key(ctx) 611 evp_pkey = self._dsa_cdata_to_evp_pkey(ctx) 612 613 return _DSAPrivateKey(self, ctx, evp_pkey) 614 615 def generate_dsa_private_key_and_parameters(self, key_size): 616 parameters = self.generate_dsa_parameters(key_size) 617 return self.generate_dsa_private_key(parameters) 618 619 def _dsa_cdata_set_values(self, dsa_cdata, p, q, g, pub_key, priv_key): 620 res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g) 621 self.openssl_assert(res == 1) 622 res = self._lib.DSA_set0_key(dsa_cdata, pub_key, priv_key) 623 self.openssl_assert(res == 1) 624 625 def load_dsa_private_numbers(self, numbers): 626 dsa._check_dsa_private_numbers(numbers) 627 parameter_numbers = numbers.public_numbers.parameter_numbers 628 629 dsa_cdata = self._lib.DSA_new() 630 self.openssl_assert(dsa_cdata != self._ffi.NULL) 631 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 632 633 p = self._int_to_bn(parameter_numbers.p) 634 q = self._int_to_bn(parameter_numbers.q) 635 g = self._int_to_bn(parameter_numbers.g) 636 pub_key = self._int_to_bn(numbers.public_numbers.y) 637 priv_key = self._int_to_bn(numbers.x) 638 self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key) 639 640 evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata) 641 642 return _DSAPrivateKey(self, dsa_cdata, evp_pkey) 643 644 def load_dsa_public_numbers(self, numbers): 645 dsa._check_dsa_parameters(numbers.parameter_numbers) 646 dsa_cdata = self._lib.DSA_new() 647 self.openssl_assert(dsa_cdata != self._ffi.NULL) 648 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 649 650 p = self._int_to_bn(numbers.parameter_numbers.p) 651 q = self._int_to_bn(numbers.parameter_numbers.q) 652 g = self._int_to_bn(numbers.parameter_numbers.g) 653 pub_key = self._int_to_bn(numbers.y) 654 priv_key = self._ffi.NULL 655 self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key) 656 657 evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata) 658 659 return _DSAPublicKey(self, dsa_cdata, evp_pkey) 660 661 def load_dsa_parameter_numbers(self, numbers): 662 dsa._check_dsa_parameters(numbers) 663 dsa_cdata = self._lib.DSA_new() 664 self.openssl_assert(dsa_cdata != self._ffi.NULL) 665 dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free) 666 667 p = self._int_to_bn(numbers.p) 668 q = self._int_to_bn(numbers.q) 669 g = self._int_to_bn(numbers.g) 670 res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g) 671 self.openssl_assert(res == 1) 672 673 return _DSAParameters(self, dsa_cdata) 674 675 def _dsa_cdata_to_evp_pkey(self, dsa_cdata): 676 evp_pkey = self._create_evp_pkey_gc() 677 res = self._lib.EVP_PKEY_set1_DSA(evp_pkey, dsa_cdata) 678 self.openssl_assert(res == 1) 679 return evp_pkey 680 681 def dsa_hash_supported(self, algorithm): 682 return self.hash_supported(algorithm) 683 684 def dsa_parameters_supported(self, p, q, g): 685 return True 686 687 def cmac_algorithm_supported(self, algorithm): 688 return self.cipher_supported( 689 algorithm, CBC(b"\x00" * algorithm.block_size) 690 ) 691 692 def create_cmac_ctx(self, algorithm): 693 return _CMACContext(self, algorithm) 694 695 def create_x509_csr(self, builder, private_key, algorithm): 696 if not isinstance(algorithm, hashes.HashAlgorithm): 697 raise TypeError('Algorithm must be a registered hash algorithm.') 698 699 if ( 700 isinstance(algorithm, hashes.MD5) and not 701 isinstance(private_key, rsa.RSAPrivateKey) 702 ): 703 raise ValueError( 704 "MD5 is not a supported hash algorithm for EC/DSA CSRs" 705 ) 706 707 # Resolve the signature algorithm. 708 evp_md = self._evp_md_non_null_from_algorithm(algorithm) 709 710 # Create an empty request. 711 x509_req = self._lib.X509_REQ_new() 712 self.openssl_assert(x509_req != self._ffi.NULL) 713 x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free) 714 715 # Set x509 version. 716 res = self._lib.X509_REQ_set_version(x509_req, x509.Version.v1.value) 717 self.openssl_assert(res == 1) 718 719 # Set subject name. 720 res = self._lib.X509_REQ_set_subject_name( 721 x509_req, _encode_name_gc(self, builder._subject_name) 722 ) 723 self.openssl_assert(res == 1) 724 725 # Set subject public key. 726 public_key = private_key.public_key() 727 res = self._lib.X509_REQ_set_pubkey( 728 x509_req, public_key._evp_pkey 729 ) 730 self.openssl_assert(res == 1) 731 732 # Add extensions. 733 sk_extension = self._lib.sk_X509_EXTENSION_new_null() 734 self.openssl_assert(sk_extension != self._ffi.NULL) 735 sk_extension = self._ffi.gc( 736 sk_extension, 737 lambda x: self._lib.sk_X509_EXTENSION_pop_free( 738 x, self._ffi.addressof( 739 self._lib._original_lib, "X509_EXTENSION_free" 740 ) 741 ) 742 ) 743 # Don't GC individual extensions because the memory is owned by 744 # sk_extensions and will be freed along with it. 745 self._create_x509_extensions( 746 extensions=builder._extensions, 747 handlers=_EXTENSION_ENCODE_HANDLERS, 748 x509_obj=sk_extension, 749 add_func=self._lib.sk_X509_EXTENSION_insert, 750 gc=False 751 ) 752 res = self._lib.X509_REQ_add_extensions(x509_req, sk_extension) 753 self.openssl_assert(res == 1) 754 755 # Sign the request using the requester's private key. 756 res = self._lib.X509_REQ_sign( 757 x509_req, private_key._evp_pkey, evp_md 758 ) 759 if res == 0: 760 errors = self._consume_errors() 761 self.openssl_assert( 762 errors[0]._lib_reason_match( 763 self._lib.ERR_LIB_RSA, 764 self._lib.RSA_R_DIGEST_TOO_BIG_FOR_RSA_KEY 765 ) 766 ) 767 768 raise ValueError("Digest too big for RSA key") 769 770 return _CertificateSigningRequest(self, x509_req) 771 772 def create_x509_certificate(self, builder, private_key, algorithm): 773 if not isinstance(builder, x509.CertificateBuilder): 774 raise TypeError('Builder type mismatch.') 775 if not isinstance(algorithm, hashes.HashAlgorithm): 776 raise TypeError('Algorithm must be a registered hash algorithm.') 777 778 if ( 779 isinstance(algorithm, hashes.MD5) and not 780 isinstance(private_key, rsa.RSAPrivateKey) 781 ): 782 raise ValueError( 783 "MD5 is not a supported hash algorithm for EC/DSA certificates" 784 ) 785 786 # Resolve the signature algorithm. 787 evp_md = self._evp_md_non_null_from_algorithm(algorithm) 788 789 # Create an empty certificate. 790 x509_cert = self._lib.X509_new() 791 x509_cert = self._ffi.gc(x509_cert, backend._lib.X509_free) 792 793 # Set the x509 version. 794 res = self._lib.X509_set_version(x509_cert, builder._version.value) 795 self.openssl_assert(res == 1) 796 797 # Set the subject's name. 798 res = self._lib.X509_set_subject_name( 799 x509_cert, _encode_name_gc(self, builder._subject_name) 800 ) 801 self.openssl_assert(res == 1) 802 803 # Set the subject's public key. 804 res = self._lib.X509_set_pubkey( 805 x509_cert, builder._public_key._evp_pkey 806 ) 807 self.openssl_assert(res == 1) 808 809 # Set the certificate serial number. 810 serial_number = _encode_asn1_int_gc(self, builder._serial_number) 811 res = self._lib.X509_set_serialNumber(x509_cert, serial_number) 812 self.openssl_assert(res == 1) 813 814 # Set the "not before" time. 815 self._set_asn1_time( 816 self._lib.X509_get_notBefore(x509_cert), builder._not_valid_before 817 ) 818 819 # Set the "not after" time. 820 self._set_asn1_time( 821 self._lib.X509_get_notAfter(x509_cert), builder._not_valid_after 822 ) 823 824 # Add extensions. 825 self._create_x509_extensions( 826 extensions=builder._extensions, 827 handlers=_EXTENSION_ENCODE_HANDLERS, 828 x509_obj=x509_cert, 829 add_func=self._lib.X509_add_ext, 830 gc=True 831 ) 832 833 # Set the issuer name. 834 res = self._lib.X509_set_issuer_name( 835 x509_cert, _encode_name_gc(self, builder._issuer_name) 836 ) 837 self.openssl_assert(res == 1) 838 839 # Sign the certificate with the issuer's private key. 840 res = self._lib.X509_sign( 841 x509_cert, private_key._evp_pkey, evp_md 842 ) 843 if res == 0: 844 errors = self._consume_errors() 845 self.openssl_assert( 846 errors[0]._lib_reason_match( 847 self._lib.ERR_LIB_RSA, 848 self._lib.RSA_R_DIGEST_TOO_BIG_FOR_RSA_KEY 849 ) 850 ) 851 raise ValueError("Digest too big for RSA key") 852 853 return _Certificate(self, x509_cert) 854 855 def _set_asn1_time(self, asn1_time, time): 856 if time.year >= 2050: 857 asn1_str = time.strftime('%Y%m%d%H%M%SZ').encode('ascii') 858 else: 859 asn1_str = time.strftime('%y%m%d%H%M%SZ').encode('ascii') 860 res = self._lib.ASN1_TIME_set_string(asn1_time, asn1_str) 861 self.openssl_assert(res == 1) 862 863 def _create_asn1_time(self, time): 864 asn1_time = self._lib.ASN1_TIME_new() 865 self.openssl_assert(asn1_time != self._ffi.NULL) 866 asn1_time = self._ffi.gc(asn1_time, self._lib.ASN1_TIME_free) 867 self._set_asn1_time(asn1_time, time) 868 return asn1_time 869 870 def create_x509_crl(self, builder, private_key, algorithm): 871 if not isinstance(builder, x509.CertificateRevocationListBuilder): 872 raise TypeError('Builder type mismatch.') 873 if not isinstance(algorithm, hashes.HashAlgorithm): 874 raise TypeError('Algorithm must be a registered hash algorithm.') 875 876 if ( 877 isinstance(algorithm, hashes.MD5) and not 878 isinstance(private_key, rsa.RSAPrivateKey) 879 ): 880 raise ValueError( 881 "MD5 is not a supported hash algorithm for EC/DSA CRLs" 882 ) 883 884 evp_md = self._evp_md_non_null_from_algorithm(algorithm) 885 886 # Create an empty CRL. 887 x509_crl = self._lib.X509_CRL_new() 888 x509_crl = self._ffi.gc(x509_crl, backend._lib.X509_CRL_free) 889 890 # Set the x509 CRL version. We only support v2 (integer value 1). 891 res = self._lib.X509_CRL_set_version(x509_crl, 1) 892 self.openssl_assert(res == 1) 893 894 # Set the issuer name. 895 res = self._lib.X509_CRL_set_issuer_name( 896 x509_crl, _encode_name_gc(self, builder._issuer_name) 897 ) 898 self.openssl_assert(res == 1) 899 900 # Set the last update time. 901 last_update = self._create_asn1_time(builder._last_update) 902 res = self._lib.X509_CRL_set_lastUpdate(x509_crl, last_update) 903 self.openssl_assert(res == 1) 904 905 # Set the next update time. 906 next_update = self._create_asn1_time(builder._next_update) 907 res = self._lib.X509_CRL_set_nextUpdate(x509_crl, next_update) 908 self.openssl_assert(res == 1) 909 910 # Add extensions. 911 self._create_x509_extensions( 912 extensions=builder._extensions, 913 handlers=_CRL_EXTENSION_ENCODE_HANDLERS, 914 x509_obj=x509_crl, 915 add_func=self._lib.X509_CRL_add_ext, 916 gc=True 917 ) 918 919 # add revoked certificates 920 for revoked_cert in builder._revoked_certificates: 921 # Duplicating because the X509_CRL takes ownership and will free 922 # this memory when X509_CRL_free is called. 923 revoked = self._lib.Cryptography_X509_REVOKED_dup( 924 revoked_cert._x509_revoked 925 ) 926 self.openssl_assert(revoked != self._ffi.NULL) 927 res = self._lib.X509_CRL_add0_revoked(x509_crl, revoked) 928 self.openssl_assert(res == 1) 929 930 res = self._lib.X509_CRL_sign( 931 x509_crl, private_key._evp_pkey, evp_md 932 ) 933 if res == 0: 934 errors = self._consume_errors() 935 self.openssl_assert( 936 errors[0]._lib_reason_match( 937 self._lib.ERR_LIB_RSA, 938 self._lib.RSA_R_DIGEST_TOO_BIG_FOR_RSA_KEY 939 ) 940 ) 941 raise ValueError("Digest too big for RSA key") 942 943 return _CertificateRevocationList(self, x509_crl) 944 945 def _create_x509_extensions(self, extensions, handlers, x509_obj, 946 add_func, gc): 947 for i, extension in enumerate(extensions): 948 x509_extension = self._create_x509_extension( 949 handlers, extension 950 ) 951 self.openssl_assert(x509_extension != self._ffi.NULL) 952 953 if gc: 954 x509_extension = self._ffi.gc( 955 x509_extension, self._lib.X509_EXTENSION_free 956 ) 957 res = add_func(x509_obj, x509_extension, i) 958 self.openssl_assert(res >= 1) 959 960 def _create_raw_x509_extension(self, extension, value): 961 obj = _txt2obj_gc(self, extension.oid.dotted_string) 962 return self._lib.X509_EXTENSION_create_by_OBJ( 963 self._ffi.NULL, obj, 1 if extension.critical else 0, value 964 ) 965 966 def _create_x509_extension(self, handlers, extension): 967 if isinstance(extension.value, x509.UnrecognizedExtension): 968 value = _encode_asn1_str_gc(self, extension.value.value) 969 return self._create_raw_x509_extension(extension, value) 970 elif isinstance(extension.value, x509.TLSFeature): 971 asn1 = _Integers([x.value for x in extension.value]).dump() 972 value = _encode_asn1_str_gc(self, asn1) 973 return self._create_raw_x509_extension(extension, value) 974 elif isinstance(extension.value, x509.PrecertPoison): 975 asn1 = asn1crypto.core.Null().dump() 976 value = _encode_asn1_str_gc(self, asn1) 977 return self._create_raw_x509_extension(extension, value) 978 else: 979 try: 980 encode = handlers[extension.oid] 981 except KeyError: 982 raise NotImplementedError( 983 'Extension not supported: {0}'.format(extension.oid) 984 ) 985 986 ext_struct = encode(self, extension.value) 987 nid = self._lib.OBJ_txt2nid( 988 extension.oid.dotted_string.encode("ascii") 989 ) 990 backend.openssl_assert(nid != self._lib.NID_undef) 991 return self._lib.X509V3_EXT_i2d( 992 nid, 1 if extension.critical else 0, ext_struct 993 ) 994 995 def create_x509_revoked_certificate(self, builder): 996 if not isinstance(builder, x509.RevokedCertificateBuilder): 997 raise TypeError('Builder type mismatch.') 998 999 x509_revoked = self._lib.X509_REVOKED_new() 1000 self.openssl_assert(x509_revoked != self._ffi.NULL) 1001 x509_revoked = self._ffi.gc(x509_revoked, self._lib.X509_REVOKED_free) 1002 serial_number = _encode_asn1_int_gc(self, builder._serial_number) 1003 res = self._lib.X509_REVOKED_set_serialNumber( 1004 x509_revoked, serial_number 1005 ) 1006 self.openssl_assert(res == 1) 1007 rev_date = self._create_asn1_time(builder._revocation_date) 1008 res = self._lib.X509_REVOKED_set_revocationDate(x509_revoked, rev_date) 1009 self.openssl_assert(res == 1) 1010 # add CRL entry extensions 1011 self._create_x509_extensions( 1012 extensions=builder._extensions, 1013 handlers=_CRL_ENTRY_EXTENSION_ENCODE_HANDLERS, 1014 x509_obj=x509_revoked, 1015 add_func=self._lib.X509_REVOKED_add_ext, 1016 gc=True 1017 ) 1018 return _RevokedCertificate(self, None, x509_revoked) 1019 1020 def load_pem_private_key(self, data, password): 1021 return self._load_key( 1022 self._lib.PEM_read_bio_PrivateKey, 1023 self._evp_pkey_to_private_key, 1024 data, 1025 password, 1026 ) 1027 1028 def load_pem_public_key(self, data): 1029 mem_bio = self._bytes_to_bio(data) 1030 evp_pkey = self._lib.PEM_read_bio_PUBKEY( 1031 mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 1032 ) 1033 if evp_pkey != self._ffi.NULL: 1034 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 1035 return self._evp_pkey_to_public_key(evp_pkey) 1036 else: 1037 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still 1038 # need to check to see if it is a pure PKCS1 RSA public key (not 1039 # embedded in a subjectPublicKeyInfo) 1040 self._consume_errors() 1041 res = self._lib.BIO_reset(mem_bio.bio) 1042 self.openssl_assert(res == 1) 1043 rsa_cdata = self._lib.PEM_read_bio_RSAPublicKey( 1044 mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 1045 ) 1046 if rsa_cdata != self._ffi.NULL: 1047 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 1048 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 1049 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 1050 else: 1051 self._handle_key_loading_error() 1052 1053 def load_pem_parameters(self, data): 1054 mem_bio = self._bytes_to_bio(data) 1055 # only DH is supported currently 1056 dh_cdata = self._lib.PEM_read_bio_DHparams( 1057 mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL) 1058 if dh_cdata != self._ffi.NULL: 1059 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 1060 return _DHParameters(self, dh_cdata) 1061 else: 1062 self._handle_key_loading_error() 1063 1064 def load_der_private_key(self, data, password): 1065 # OpenSSL has a function called d2i_AutoPrivateKey that in theory 1066 # handles this automatically, however it doesn't handle encrypted 1067 # private keys. Instead we try to load the key two different ways. 1068 # First we'll try to load it as a traditional key. 1069 bio_data = self._bytes_to_bio(data) 1070 key = self._evp_pkey_from_der_traditional_key(bio_data, password) 1071 if key: 1072 return self._evp_pkey_to_private_key(key) 1073 else: 1074 # Finally we try to load it with the method that handles encrypted 1075 # PKCS8 properly. 1076 return self._load_key( 1077 self._lib.d2i_PKCS8PrivateKey_bio, 1078 self._evp_pkey_to_private_key, 1079 data, 1080 password, 1081 ) 1082 1083 def _evp_pkey_from_der_traditional_key(self, bio_data, password): 1084 key = self._lib.d2i_PrivateKey_bio(bio_data.bio, self._ffi.NULL) 1085 if key != self._ffi.NULL: 1086 key = self._ffi.gc(key, self._lib.EVP_PKEY_free) 1087 if password is not None: 1088 raise TypeError( 1089 "Password was given but private key is not encrypted." 1090 ) 1091 1092 return key 1093 else: 1094 self._consume_errors() 1095 return None 1096 1097 def load_der_public_key(self, data): 1098 mem_bio = self._bytes_to_bio(data) 1099 evp_pkey = self._lib.d2i_PUBKEY_bio(mem_bio.bio, self._ffi.NULL) 1100 if evp_pkey != self._ffi.NULL: 1101 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 1102 return self._evp_pkey_to_public_key(evp_pkey) 1103 else: 1104 # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still 1105 # need to check to see if it is a pure PKCS1 RSA public key (not 1106 # embedded in a subjectPublicKeyInfo) 1107 self._consume_errors() 1108 res = self._lib.BIO_reset(mem_bio.bio) 1109 self.openssl_assert(res == 1) 1110 rsa_cdata = self._lib.d2i_RSAPublicKey_bio( 1111 mem_bio.bio, self._ffi.NULL 1112 ) 1113 if rsa_cdata != self._ffi.NULL: 1114 rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free) 1115 evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata) 1116 return _RSAPublicKey(self, rsa_cdata, evp_pkey) 1117 else: 1118 self._handle_key_loading_error() 1119 1120 def load_der_parameters(self, data): 1121 mem_bio = self._bytes_to_bio(data) 1122 dh_cdata = self._lib.d2i_DHparams_bio( 1123 mem_bio.bio, self._ffi.NULL 1124 ) 1125 if dh_cdata != self._ffi.NULL: 1126 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 1127 return _DHParameters(self, dh_cdata) 1128 elif self._lib.Cryptography_HAS_EVP_PKEY_DHX: 1129 # We check to see if the is dhx. 1130 self._consume_errors() 1131 res = self._lib.BIO_reset(mem_bio.bio) 1132 self.openssl_assert(res == 1) 1133 dh_cdata = self._lib.Cryptography_d2i_DHxparams_bio( 1134 mem_bio.bio, self._ffi.NULL 1135 ) 1136 if dh_cdata != self._ffi.NULL: 1137 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 1138 return _DHParameters(self, dh_cdata) 1139 1140 self._handle_key_loading_error() 1141 1142 def load_pem_x509_certificate(self, data): 1143 mem_bio = self._bytes_to_bio(data) 1144 x509 = self._lib.PEM_read_bio_X509( 1145 mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 1146 ) 1147 if x509 == self._ffi.NULL: 1148 self._consume_errors() 1149 raise ValueError( 1150 "Unable to load certificate. See https://cryptography.io/en/la" 1151 "test/faq/#why-can-t-i-import-my-pem-file for more details." 1152 ) 1153 1154 x509 = self._ffi.gc(x509, self._lib.X509_free) 1155 return _Certificate(self, x509) 1156 1157 def load_der_x509_certificate(self, data): 1158 mem_bio = self._bytes_to_bio(data) 1159 x509 = self._lib.d2i_X509_bio(mem_bio.bio, self._ffi.NULL) 1160 if x509 == self._ffi.NULL: 1161 self._consume_errors() 1162 raise ValueError("Unable to load certificate") 1163 1164 x509 = self._ffi.gc(x509, self._lib.X509_free) 1165 return _Certificate(self, x509) 1166 1167 def load_pem_x509_crl(self, data): 1168 mem_bio = self._bytes_to_bio(data) 1169 x509_crl = self._lib.PEM_read_bio_X509_CRL( 1170 mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 1171 ) 1172 if x509_crl == self._ffi.NULL: 1173 self._consume_errors() 1174 raise ValueError( 1175 "Unable to load CRL. See https://cryptography.io/en/la" 1176 "test/faq/#why-can-t-i-import-my-pem-file for more details." 1177 ) 1178 1179 x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free) 1180 return _CertificateRevocationList(self, x509_crl) 1181 1182 def load_der_x509_crl(self, data): 1183 mem_bio = self._bytes_to_bio(data) 1184 x509_crl = self._lib.d2i_X509_CRL_bio(mem_bio.bio, self._ffi.NULL) 1185 if x509_crl == self._ffi.NULL: 1186 self._consume_errors() 1187 raise ValueError("Unable to load CRL") 1188 1189 x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free) 1190 return _CertificateRevocationList(self, x509_crl) 1191 1192 def load_pem_x509_csr(self, data): 1193 mem_bio = self._bytes_to_bio(data) 1194 x509_req = self._lib.PEM_read_bio_X509_REQ( 1195 mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL 1196 ) 1197 if x509_req == self._ffi.NULL: 1198 self._consume_errors() 1199 raise ValueError( 1200 "Unable to load request. See https://cryptography.io/en/la" 1201 "test/faq/#why-can-t-i-import-my-pem-file for more details." 1202 ) 1203 1204 x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free) 1205 return _CertificateSigningRequest(self, x509_req) 1206 1207 def load_der_x509_csr(self, data): 1208 mem_bio = self._bytes_to_bio(data) 1209 x509_req = self._lib.d2i_X509_REQ_bio(mem_bio.bio, self._ffi.NULL) 1210 if x509_req == self._ffi.NULL: 1211 self._consume_errors() 1212 raise ValueError("Unable to load request") 1213 1214 x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free) 1215 return _CertificateSigningRequest(self, x509_req) 1216 1217 def _load_key(self, openssl_read_func, convert_func, data, password): 1218 mem_bio = self._bytes_to_bio(data) 1219 1220 userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *") 1221 if password is not None: 1222 utils._check_byteslike("password", password) 1223 password_ptr = self._ffi.from_buffer(password) 1224 userdata.password = password_ptr 1225 userdata.length = len(password) 1226 1227 evp_pkey = openssl_read_func( 1228 mem_bio.bio, 1229 self._ffi.NULL, 1230 self._ffi.addressof( 1231 self._lib._original_lib, "Cryptography_pem_password_cb" 1232 ), 1233 userdata, 1234 ) 1235 1236 if evp_pkey == self._ffi.NULL: 1237 if userdata.error != 0: 1238 errors = self._consume_errors() 1239 self.openssl_assert(errors) 1240 if userdata.error == -1: 1241 raise TypeError( 1242 "Password was not given but private key is encrypted" 1243 ) 1244 else: 1245 assert userdata.error == -2 1246 raise ValueError( 1247 "Passwords longer than {0} bytes are not supported " 1248 "by this backend.".format(userdata.maxsize - 1) 1249 ) 1250 else: 1251 self._handle_key_loading_error() 1252 1253 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 1254 1255 if password is not None and userdata.called == 0: 1256 raise TypeError( 1257 "Password was given but private key is not encrypted.") 1258 1259 assert ( 1260 (password is not None and userdata.called == 1) or 1261 password is None 1262 ) 1263 1264 return convert_func(evp_pkey) 1265 1266 def _handle_key_loading_error(self): 1267 errors = self._consume_errors() 1268 1269 if not errors: 1270 raise ValueError("Could not deserialize key data.") 1271 1272 elif ( 1273 errors[0]._lib_reason_match( 1274 self._lib.ERR_LIB_EVP, self._lib.EVP_R_BAD_DECRYPT 1275 ) or errors[0]._lib_reason_match( 1276 self._lib.ERR_LIB_PKCS12, 1277 self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR 1278 ) 1279 ): 1280 raise ValueError("Bad decrypt. Incorrect password?") 1281 1282 elif ( 1283 errors[0]._lib_reason_match( 1284 self._lib.ERR_LIB_EVP, self._lib.EVP_R_UNKNOWN_PBE_ALGORITHM 1285 ) or errors[0]._lib_reason_match( 1286 self._lib.ERR_LIB_PEM, self._lib.PEM_R_UNSUPPORTED_ENCRYPTION 1287 ) 1288 ): 1289 raise UnsupportedAlgorithm( 1290 "PEM data is encrypted with an unsupported cipher", 1291 _Reasons.UNSUPPORTED_CIPHER 1292 ) 1293 1294 elif any( 1295 error._lib_reason_match( 1296 self._lib.ERR_LIB_EVP, 1297 self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM 1298 ) 1299 for error in errors 1300 ): 1301 raise ValueError("Unsupported public key algorithm.") 1302 1303 else: 1304 assert errors[0].lib in ( 1305 self._lib.ERR_LIB_EVP, 1306 self._lib.ERR_LIB_PEM, 1307 self._lib.ERR_LIB_ASN1, 1308 ) 1309 raise ValueError("Could not deserialize key data.") 1310 1311 def elliptic_curve_supported(self, curve): 1312 try: 1313 curve_nid = self._elliptic_curve_to_nid(curve) 1314 except UnsupportedAlgorithm: 1315 curve_nid = self._lib.NID_undef 1316 1317 group = self._lib.EC_GROUP_new_by_curve_name(curve_nid) 1318 1319 if group == self._ffi.NULL: 1320 errors = self._consume_errors() 1321 self.openssl_assert( 1322 curve_nid == self._lib.NID_undef or 1323 errors[0]._lib_reason_match( 1324 self._lib.ERR_LIB_EC, 1325 self._lib.EC_R_UNKNOWN_GROUP 1326 ) 1327 ) 1328 return False 1329 else: 1330 self.openssl_assert(curve_nid != self._lib.NID_undef) 1331 self._lib.EC_GROUP_free(group) 1332 return True 1333 1334 def elliptic_curve_signature_algorithm_supported( 1335 self, signature_algorithm, curve 1336 ): 1337 # We only support ECDSA right now. 1338 if not isinstance(signature_algorithm, ec.ECDSA): 1339 return False 1340 1341 return self.elliptic_curve_supported(curve) 1342 1343 def generate_elliptic_curve_private_key(self, curve): 1344 """ 1345 Generate a new private key on the named curve. 1346 """ 1347 1348 if self.elliptic_curve_supported(curve): 1349 ec_cdata = self._ec_key_new_by_curve(curve) 1350 1351 res = self._lib.EC_KEY_generate_key(ec_cdata) 1352 self.openssl_assert(res == 1) 1353 1354 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 1355 1356 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 1357 else: 1358 raise UnsupportedAlgorithm( 1359 "Backend object does not support {0}.".format(curve.name), 1360 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE 1361 ) 1362 1363 def load_elliptic_curve_private_numbers(self, numbers): 1364 public = numbers.public_numbers 1365 1366 ec_cdata = self._ec_key_new_by_curve(public.curve) 1367 1368 private_value = self._ffi.gc( 1369 self._int_to_bn(numbers.private_value), self._lib.BN_clear_free 1370 ) 1371 res = self._lib.EC_KEY_set_private_key(ec_cdata, private_value) 1372 self.openssl_assert(res == 1) 1373 1374 ec_cdata = self._ec_key_set_public_key_affine_coordinates( 1375 ec_cdata, public.x, public.y) 1376 1377 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 1378 1379 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 1380 1381 def load_elliptic_curve_public_numbers(self, numbers): 1382 ec_cdata = self._ec_key_new_by_curve(numbers.curve) 1383 ec_cdata = self._ec_key_set_public_key_affine_coordinates( 1384 ec_cdata, numbers.x, numbers.y) 1385 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 1386 1387 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey) 1388 1389 def load_elliptic_curve_public_bytes(self, curve, point_bytes): 1390 ec_cdata = self._ec_key_new_by_curve(curve) 1391 group = self._lib.EC_KEY_get0_group(ec_cdata) 1392 self.openssl_assert(group != self._ffi.NULL) 1393 point = self._lib.EC_POINT_new(group) 1394 self.openssl_assert(point != self._ffi.NULL) 1395 point = self._ffi.gc(point, self._lib.EC_POINT_free) 1396 with self._tmp_bn_ctx() as bn_ctx: 1397 res = self._lib.EC_POINT_oct2point( 1398 group, point, point_bytes, len(point_bytes), bn_ctx 1399 ) 1400 if res != 1: 1401 self._consume_errors() 1402 raise ValueError("Invalid public bytes for the given curve") 1403 1404 res = self._lib.EC_KEY_set_public_key(ec_cdata, point) 1405 self.openssl_assert(res == 1) 1406 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 1407 return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey) 1408 1409 def derive_elliptic_curve_private_key(self, private_value, curve): 1410 ec_cdata = self._ec_key_new_by_curve(curve) 1411 1412 get_func, group = self._ec_key_determine_group_get_func(ec_cdata) 1413 1414 point = self._lib.EC_POINT_new(group) 1415 self.openssl_assert(point != self._ffi.NULL) 1416 point = self._ffi.gc(point, self._lib.EC_POINT_free) 1417 1418 value = self._int_to_bn(private_value) 1419 value = self._ffi.gc(value, self._lib.BN_clear_free) 1420 1421 with self._tmp_bn_ctx() as bn_ctx: 1422 res = self._lib.EC_POINT_mul(group, point, value, self._ffi.NULL, 1423 self._ffi.NULL, bn_ctx) 1424 self.openssl_assert(res == 1) 1425 1426 bn_x = self._lib.BN_CTX_get(bn_ctx) 1427 bn_y = self._lib.BN_CTX_get(bn_ctx) 1428 1429 res = get_func(group, point, bn_x, bn_y, bn_ctx) 1430 self.openssl_assert(res == 1) 1431 1432 res = self._lib.EC_KEY_set_public_key(ec_cdata, point) 1433 self.openssl_assert(res == 1) 1434 private = self._int_to_bn(private_value) 1435 private = self._ffi.gc(private, self._lib.BN_clear_free) 1436 res = self._lib.EC_KEY_set_private_key(ec_cdata, private) 1437 self.openssl_assert(res == 1) 1438 1439 evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata) 1440 1441 return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey) 1442 1443 def _ec_key_new_by_curve(self, curve): 1444 curve_nid = self._elliptic_curve_to_nid(curve) 1445 ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid) 1446 self.openssl_assert(ec_cdata != self._ffi.NULL) 1447 return self._ffi.gc(ec_cdata, self._lib.EC_KEY_free) 1448 1449 def load_der_ocsp_request(self, data): 1450 mem_bio = self._bytes_to_bio(data) 1451 request = self._lib.d2i_OCSP_REQUEST_bio(mem_bio.bio, self._ffi.NULL) 1452 if request == self._ffi.NULL: 1453 self._consume_errors() 1454 raise ValueError("Unable to load OCSP request") 1455 1456 request = self._ffi.gc(request, self._lib.OCSP_REQUEST_free) 1457 return _OCSPRequest(self, request) 1458 1459 def load_der_ocsp_response(self, data): 1460 mem_bio = self._bytes_to_bio(data) 1461 response = self._lib.d2i_OCSP_RESPONSE_bio(mem_bio.bio, self._ffi.NULL) 1462 if response == self._ffi.NULL: 1463 self._consume_errors() 1464 raise ValueError("Unable to load OCSP response") 1465 1466 response = self._ffi.gc(response, self._lib.OCSP_RESPONSE_free) 1467 return _OCSPResponse(self, response) 1468 1469 def create_ocsp_request(self, builder): 1470 ocsp_req = self._lib.OCSP_REQUEST_new() 1471 self.openssl_assert(ocsp_req != self._ffi.NULL) 1472 ocsp_req = self._ffi.gc(ocsp_req, self._lib.OCSP_REQUEST_free) 1473 cert, issuer, algorithm = builder._request 1474 evp_md = self._evp_md_non_null_from_algorithm(algorithm) 1475 certid = self._lib.OCSP_cert_to_id( 1476 evp_md, cert._x509, issuer._x509 1477 ) 1478 self.openssl_assert(certid != self._ffi.NULL) 1479 onereq = self._lib.OCSP_request_add0_id(ocsp_req, certid) 1480 self.openssl_assert(onereq != self._ffi.NULL) 1481 self._create_x509_extensions( 1482 extensions=builder._extensions, 1483 handlers=_OCSP_REQUEST_EXTENSION_ENCODE_HANDLERS, 1484 x509_obj=ocsp_req, 1485 add_func=self._lib.OCSP_REQUEST_add_ext, 1486 gc=True, 1487 ) 1488 return _OCSPRequest(self, ocsp_req) 1489 1490 def _create_ocsp_basic_response(self, builder, private_key, algorithm): 1491 basic = self._lib.OCSP_BASICRESP_new() 1492 self.openssl_assert(basic != self._ffi.NULL) 1493 basic = self._ffi.gc(basic, self._lib.OCSP_BASICRESP_free) 1494 evp_md = self._evp_md_non_null_from_algorithm( 1495 builder._response._algorithm 1496 ) 1497 certid = self._lib.OCSP_cert_to_id( 1498 evp_md, builder._response._cert._x509, 1499 builder._response._issuer._x509 1500 ) 1501 self.openssl_assert(certid != self._ffi.NULL) 1502 certid = self._ffi.gc(certid, self._lib.OCSP_CERTID_free) 1503 if builder._response._revocation_reason is None: 1504 reason = -1 1505 else: 1506 reason = _CRL_ENTRY_REASON_ENUM_TO_CODE[ 1507 builder._response._revocation_reason 1508 ] 1509 if builder._response._revocation_time is None: 1510 rev_time = self._ffi.NULL 1511 else: 1512 rev_time = self._create_asn1_time( 1513 builder._response._revocation_time 1514 ) 1515 1516 next_update = self._ffi.NULL 1517 if builder._response._next_update is not None: 1518 next_update = self._create_asn1_time( 1519 builder._response._next_update 1520 ) 1521 1522 this_update = self._create_asn1_time(builder._response._this_update) 1523 1524 res = self._lib.OCSP_basic_add1_status( 1525 basic, 1526 certid, 1527 builder._response._cert_status.value, 1528 reason, 1529 rev_time, 1530 this_update, 1531 next_update 1532 ) 1533 self.openssl_assert(res != self._ffi.NULL) 1534 # okay, now sign the basic structure 1535 evp_md = self._evp_md_non_null_from_algorithm(algorithm) 1536 responder_cert, responder_encoding = builder._responder_id 1537 flags = self._lib.OCSP_NOCERTS 1538 if responder_encoding is ocsp.OCSPResponderEncoding.HASH: 1539 flags |= self._lib.OCSP_RESPID_KEY 1540 1541 if builder._certs is not None: 1542 for cert in builder._certs: 1543 res = self._lib.OCSP_basic_add1_cert(basic, cert._x509) 1544 self.openssl_assert(res == 1) 1545 1546 self._create_x509_extensions( 1547 extensions=builder._extensions, 1548 handlers=_OCSP_BASICRESP_EXTENSION_ENCODE_HANDLERS, 1549 x509_obj=basic, 1550 add_func=self._lib.OCSP_BASICRESP_add_ext, 1551 gc=True, 1552 ) 1553 1554 res = self._lib.OCSP_basic_sign( 1555 basic, responder_cert._x509, private_key._evp_pkey, 1556 evp_md, self._ffi.NULL, flags 1557 ) 1558 if res != 1: 1559 errors = self._consume_errors() 1560 self.openssl_assert( 1561 errors[0]._lib_reason_match( 1562 self._lib.ERR_LIB_X509, 1563 self._lib.X509_R_KEY_VALUES_MISMATCH 1564 ) 1565 ) 1566 raise ValueError("responder_cert must be signed by private_key") 1567 1568 return basic 1569 1570 def create_ocsp_response(self, response_status, builder, private_key, 1571 algorithm): 1572 if response_status is ocsp.OCSPResponseStatus.SUCCESSFUL: 1573 basic = self._create_ocsp_basic_response( 1574 builder, private_key, algorithm 1575 ) 1576 else: 1577 basic = self._ffi.NULL 1578 1579 ocsp_resp = self._lib.OCSP_response_create( 1580 response_status.value, basic 1581 ) 1582 self.openssl_assert(ocsp_resp != self._ffi.NULL) 1583 ocsp_resp = self._ffi.gc(ocsp_resp, self._lib.OCSP_RESPONSE_free) 1584 return _OCSPResponse(self, ocsp_resp) 1585 1586 def elliptic_curve_exchange_algorithm_supported(self, algorithm, curve): 1587 return ( 1588 self.elliptic_curve_supported(curve) and 1589 isinstance(algorithm, ec.ECDH) 1590 ) 1591 1592 def _ec_cdata_to_evp_pkey(self, ec_cdata): 1593 evp_pkey = self._create_evp_pkey_gc() 1594 res = self._lib.EVP_PKEY_set1_EC_KEY(evp_pkey, ec_cdata) 1595 self.openssl_assert(res == 1) 1596 return evp_pkey 1597 1598 def _elliptic_curve_to_nid(self, curve): 1599 """ 1600 Get the NID for a curve name. 1601 """ 1602 1603 curve_aliases = { 1604 "secp192r1": "prime192v1", 1605 "secp256r1": "prime256v1" 1606 } 1607 1608 curve_name = curve_aliases.get(curve.name, curve.name) 1609 1610 curve_nid = self._lib.OBJ_sn2nid(curve_name.encode()) 1611 if curve_nid == self._lib.NID_undef: 1612 raise UnsupportedAlgorithm( 1613 "{0} is not a supported elliptic curve".format(curve.name), 1614 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE 1615 ) 1616 return curve_nid 1617 1618 @contextmanager 1619 def _tmp_bn_ctx(self): 1620 bn_ctx = self._lib.BN_CTX_new() 1621 self.openssl_assert(bn_ctx != self._ffi.NULL) 1622 bn_ctx = self._ffi.gc(bn_ctx, self._lib.BN_CTX_free) 1623 self._lib.BN_CTX_start(bn_ctx) 1624 try: 1625 yield bn_ctx 1626 finally: 1627 self._lib.BN_CTX_end(bn_ctx) 1628 1629 def _ec_key_determine_group_get_func(self, ctx): 1630 """ 1631 Given an EC_KEY determine the group and what function is required to 1632 get point coordinates. 1633 """ 1634 self.openssl_assert(ctx != self._ffi.NULL) 1635 1636 nid_two_field = self._lib.OBJ_sn2nid(b"characteristic-two-field") 1637 self.openssl_assert(nid_two_field != self._lib.NID_undef) 1638 1639 group = self._lib.EC_KEY_get0_group(ctx) 1640 self.openssl_assert(group != self._ffi.NULL) 1641 1642 method = self._lib.EC_GROUP_method_of(group) 1643 self.openssl_assert(method != self._ffi.NULL) 1644 1645 nid = self._lib.EC_METHOD_get_field_type(method) 1646 self.openssl_assert(nid != self._lib.NID_undef) 1647 1648 if nid == nid_two_field and self._lib.Cryptography_HAS_EC2M: 1649 get_func = self._lib.EC_POINT_get_affine_coordinates_GF2m 1650 else: 1651 get_func = self._lib.EC_POINT_get_affine_coordinates_GFp 1652 1653 assert get_func 1654 1655 return get_func, group 1656 1657 def _ec_key_set_public_key_affine_coordinates(self, ctx, x, y): 1658 """ 1659 Sets the public key point in the EC_KEY context to the affine x and y 1660 values. 1661 """ 1662 1663 if x < 0 or y < 0: 1664 raise ValueError( 1665 "Invalid EC key. Both x and y must be non-negative." 1666 ) 1667 1668 x = self._ffi.gc(self._int_to_bn(x), self._lib.BN_free) 1669 y = self._ffi.gc(self._int_to_bn(y), self._lib.BN_free) 1670 res = self._lib.EC_KEY_set_public_key_affine_coordinates(ctx, x, y) 1671 if res != 1: 1672 self._consume_errors() 1673 raise ValueError("Invalid EC key.") 1674 1675 return ctx 1676 1677 def _private_key_bytes(self, encoding, format, encryption_algorithm, 1678 evp_pkey, cdata): 1679 if not isinstance(format, serialization.PrivateFormat): 1680 raise TypeError( 1681 "format must be an item from the PrivateFormat enum" 1682 ) 1683 1684 # X9.62 encoding is only valid for EC public keys 1685 if encoding is serialization.Encoding.X962: 1686 raise ValueError("X9.62 format is only valid for EC public keys") 1687 1688 # Raw format and encoding are only valid for X25519, Ed25519, X448, and 1689 # Ed448 keys. We capture those cases before this method is called so if 1690 # we see those enum values here it means the caller has passed them to 1691 # a key that doesn't support raw type 1692 if format is serialization.PrivateFormat.Raw: 1693 raise ValueError("raw format is invalid with this key or encoding") 1694 1695 if encoding is serialization.Encoding.Raw: 1696 raise ValueError("raw encoding is invalid with this key or format") 1697 1698 if not isinstance(encryption_algorithm, 1699 serialization.KeySerializationEncryption): 1700 raise TypeError( 1701 "Encryption algorithm must be a KeySerializationEncryption " 1702 "instance" 1703 ) 1704 1705 if isinstance(encryption_algorithm, serialization.NoEncryption): 1706 password = b"" 1707 passlen = 0 1708 evp_cipher = self._ffi.NULL 1709 elif isinstance(encryption_algorithm, 1710 serialization.BestAvailableEncryption): 1711 # This is a curated value that we will update over time. 1712 evp_cipher = self._lib.EVP_get_cipherbyname( 1713 b"aes-256-cbc" 1714 ) 1715 password = encryption_algorithm.password 1716 passlen = len(password) 1717 if passlen > 1023: 1718 raise ValueError( 1719 "Passwords longer than 1023 bytes are not supported by " 1720 "this backend" 1721 ) 1722 else: 1723 raise ValueError("Unsupported encryption type") 1724 1725 key_type = self._lib.EVP_PKEY_id(evp_pkey) 1726 if encoding is serialization.Encoding.PEM: 1727 if format is serialization.PrivateFormat.PKCS8: 1728 write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey 1729 key = evp_pkey 1730 else: 1731 assert format is serialization.PrivateFormat.TraditionalOpenSSL 1732 if key_type == self._lib.EVP_PKEY_RSA: 1733 write_bio = self._lib.PEM_write_bio_RSAPrivateKey 1734 elif key_type == self._lib.EVP_PKEY_DSA: 1735 write_bio = self._lib.PEM_write_bio_DSAPrivateKey 1736 else: 1737 assert key_type == self._lib.EVP_PKEY_EC 1738 write_bio = self._lib.PEM_write_bio_ECPrivateKey 1739 1740 key = cdata 1741 elif encoding is serialization.Encoding.DER: 1742 if format is serialization.PrivateFormat.TraditionalOpenSSL: 1743 if not isinstance( 1744 encryption_algorithm, serialization.NoEncryption 1745 ): 1746 raise ValueError( 1747 "Encryption is not supported for DER encoded " 1748 "traditional OpenSSL keys" 1749 ) 1750 1751 return self._private_key_bytes_traditional_der(key_type, cdata) 1752 else: 1753 assert format is serialization.PrivateFormat.PKCS8 1754 write_bio = self._lib.i2d_PKCS8PrivateKey_bio 1755 key = evp_pkey 1756 else: 1757 raise TypeError("encoding must be Encoding.PEM or Encoding.DER") 1758 1759 bio = self._create_mem_bio_gc() 1760 res = write_bio( 1761 bio, 1762 key, 1763 evp_cipher, 1764 password, 1765 passlen, 1766 self._ffi.NULL, 1767 self._ffi.NULL 1768 ) 1769 self.openssl_assert(res == 1) 1770 return self._read_mem_bio(bio) 1771 1772 def _private_key_bytes_traditional_der(self, key_type, cdata): 1773 if key_type == self._lib.EVP_PKEY_RSA: 1774 write_bio = self._lib.i2d_RSAPrivateKey_bio 1775 elif key_type == self._lib.EVP_PKEY_EC: 1776 write_bio = self._lib.i2d_ECPrivateKey_bio 1777 else: 1778 self.openssl_assert(key_type == self._lib.EVP_PKEY_DSA) 1779 write_bio = self._lib.i2d_DSAPrivateKey_bio 1780 1781 bio = self._create_mem_bio_gc() 1782 res = write_bio(bio, cdata) 1783 self.openssl_assert(res == 1) 1784 return self._read_mem_bio(bio) 1785 1786 def _public_key_bytes(self, encoding, format, key, evp_pkey, cdata): 1787 if not isinstance(encoding, serialization.Encoding): 1788 raise TypeError("encoding must be an item from the Encoding enum") 1789 1790 # Compressed/UncompressedPoint are only valid for EC keys and those 1791 # cases are handled by the ECPublicKey public_bytes method before this 1792 # method is called 1793 if format in (serialization.PublicFormat.UncompressedPoint, 1794 serialization.PublicFormat.CompressedPoint): 1795 raise ValueError("Point formats are not valid for this key type") 1796 1797 # Raw format and encoding are only valid for X25519, Ed25519, X448, and 1798 # Ed448 keys. We capture those cases before this method is called so if 1799 # we see those enum values here it means the caller has passed them to 1800 # a key that doesn't support raw type 1801 if format is serialization.PublicFormat.Raw: 1802 raise ValueError("raw format is invalid with this key or encoding") 1803 1804 if encoding is serialization.Encoding.Raw: 1805 raise ValueError("raw encoding is invalid with this key or format") 1806 1807 if ( 1808 format is serialization.PublicFormat.OpenSSH or 1809 encoding is serialization.Encoding.OpenSSH 1810 ): 1811 if ( 1812 format is not serialization.PublicFormat.OpenSSH or 1813 encoding is not serialization.Encoding.OpenSSH 1814 ): 1815 raise ValueError( 1816 "OpenSSH format must be used with OpenSSH encoding" 1817 ) 1818 return self._openssh_public_key_bytes(key) 1819 elif format is serialization.PublicFormat.SubjectPublicKeyInfo: 1820 if encoding is serialization.Encoding.PEM: 1821 write_bio = self._lib.PEM_write_bio_PUBKEY 1822 else: 1823 assert encoding is serialization.Encoding.DER 1824 write_bio = self._lib.i2d_PUBKEY_bio 1825 1826 key = evp_pkey 1827 elif format is serialization.PublicFormat.PKCS1: 1828 # Only RSA is supported here. 1829 assert self._lib.EVP_PKEY_id(evp_pkey) == self._lib.EVP_PKEY_RSA 1830 if encoding is serialization.Encoding.PEM: 1831 write_bio = self._lib.PEM_write_bio_RSAPublicKey 1832 else: 1833 assert encoding is serialization.Encoding.DER 1834 write_bio = self._lib.i2d_RSAPublicKey_bio 1835 1836 key = cdata 1837 else: 1838 raise TypeError( 1839 "format must be an item from the PublicFormat enum" 1840 ) 1841 1842 bio = self._create_mem_bio_gc() 1843 res = write_bio(bio, key) 1844 self.openssl_assert(res == 1) 1845 return self._read_mem_bio(bio) 1846 1847 def _openssh_public_key_bytes(self, key): 1848 if isinstance(key, rsa.RSAPublicKey): 1849 public_numbers = key.public_numbers() 1850 return b"ssh-rsa " + base64.b64encode( 1851 ssh._ssh_write_string(b"ssh-rsa") + 1852 ssh._ssh_write_mpint(public_numbers.e) + 1853 ssh._ssh_write_mpint(public_numbers.n) 1854 ) 1855 elif isinstance(key, dsa.DSAPublicKey): 1856 public_numbers = key.public_numbers() 1857 parameter_numbers = public_numbers.parameter_numbers 1858 return b"ssh-dss " + base64.b64encode( 1859 ssh._ssh_write_string(b"ssh-dss") + 1860 ssh._ssh_write_mpint(parameter_numbers.p) + 1861 ssh._ssh_write_mpint(parameter_numbers.q) + 1862 ssh._ssh_write_mpint(parameter_numbers.g) + 1863 ssh._ssh_write_mpint(public_numbers.y) 1864 ) 1865 else: 1866 assert isinstance(key, ec.EllipticCurvePublicKey) 1867 public_numbers = key.public_numbers() 1868 try: 1869 curve_name = { 1870 ec.SECP256R1: b"nistp256", 1871 ec.SECP384R1: b"nistp384", 1872 ec.SECP521R1: b"nistp521", 1873 }[type(public_numbers.curve)] 1874 except KeyError: 1875 raise ValueError( 1876 "Only SECP256R1, SECP384R1, and SECP521R1 curves are " 1877 "supported by the SSH public key format" 1878 ) 1879 1880 point = key.public_bytes( 1881 serialization.Encoding.X962, 1882 serialization.PublicFormat.UncompressedPoint 1883 ) 1884 return b"ecdsa-sha2-" + curve_name + b" " + base64.b64encode( 1885 ssh._ssh_write_string(b"ecdsa-sha2-" + curve_name) + 1886 ssh._ssh_write_string(curve_name) + 1887 ssh._ssh_write_string(point) 1888 ) 1889 1890 def _parameter_bytes(self, encoding, format, cdata): 1891 if encoding is serialization.Encoding.OpenSSH: 1892 raise TypeError( 1893 "OpenSSH encoding is not supported" 1894 ) 1895 1896 # Only DH is supported here currently. 1897 q = self._ffi.new("BIGNUM **") 1898 self._lib.DH_get0_pqg(cdata, 1899 self._ffi.NULL, 1900 q, 1901 self._ffi.NULL) 1902 if encoding is serialization.Encoding.PEM: 1903 if q[0] != self._ffi.NULL: 1904 write_bio = self._lib.PEM_write_bio_DHxparams 1905 else: 1906 write_bio = self._lib.PEM_write_bio_DHparams 1907 elif encoding is serialization.Encoding.DER: 1908 if q[0] != self._ffi.NULL: 1909 write_bio = self._lib.Cryptography_i2d_DHxparams_bio 1910 else: 1911 write_bio = self._lib.i2d_DHparams_bio 1912 else: 1913 raise TypeError("encoding must be an item from the Encoding enum") 1914 1915 bio = self._create_mem_bio_gc() 1916 res = write_bio(bio, cdata) 1917 self.openssl_assert(res == 1) 1918 return self._read_mem_bio(bio) 1919 1920 def generate_dh_parameters(self, generator, key_size): 1921 if key_size < 512: 1922 raise ValueError("DH key_size must be at least 512 bits") 1923 1924 if generator not in (2, 5): 1925 raise ValueError("DH generator must be 2 or 5") 1926 1927 dh_param_cdata = self._lib.DH_new() 1928 self.openssl_assert(dh_param_cdata != self._ffi.NULL) 1929 dh_param_cdata = self._ffi.gc(dh_param_cdata, self._lib.DH_free) 1930 1931 res = self._lib.DH_generate_parameters_ex( 1932 dh_param_cdata, 1933 key_size, 1934 generator, 1935 self._ffi.NULL 1936 ) 1937 self.openssl_assert(res == 1) 1938 1939 return _DHParameters(self, dh_param_cdata) 1940 1941 def _dh_cdata_to_evp_pkey(self, dh_cdata): 1942 evp_pkey = self._create_evp_pkey_gc() 1943 res = self._lib.EVP_PKEY_set1_DH(evp_pkey, dh_cdata) 1944 self.openssl_assert(res == 1) 1945 return evp_pkey 1946 1947 def generate_dh_private_key(self, parameters): 1948 dh_key_cdata = _dh_params_dup(parameters._dh_cdata, self) 1949 1950 res = self._lib.DH_generate_key(dh_key_cdata) 1951 self.openssl_assert(res == 1) 1952 1953 evp_pkey = self._dh_cdata_to_evp_pkey(dh_key_cdata) 1954 1955 return _DHPrivateKey(self, dh_key_cdata, evp_pkey) 1956 1957 def generate_dh_private_key_and_parameters(self, generator, key_size): 1958 return self.generate_dh_private_key( 1959 self.generate_dh_parameters(generator, key_size)) 1960 1961 def load_dh_private_numbers(self, numbers): 1962 parameter_numbers = numbers.public_numbers.parameter_numbers 1963 1964 dh_cdata = self._lib.DH_new() 1965 self.openssl_assert(dh_cdata != self._ffi.NULL) 1966 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 1967 1968 p = self._int_to_bn(parameter_numbers.p) 1969 g = self._int_to_bn(parameter_numbers.g) 1970 1971 if parameter_numbers.q is not None: 1972 q = self._int_to_bn(parameter_numbers.q) 1973 else: 1974 q = self._ffi.NULL 1975 1976 pub_key = self._int_to_bn(numbers.public_numbers.y) 1977 priv_key = self._int_to_bn(numbers.x) 1978 1979 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g) 1980 self.openssl_assert(res == 1) 1981 1982 res = self._lib.DH_set0_key(dh_cdata, pub_key, priv_key) 1983 self.openssl_assert(res == 1) 1984 1985 codes = self._ffi.new("int[]", 1) 1986 res = self._lib.Cryptography_DH_check(dh_cdata, codes) 1987 self.openssl_assert(res == 1) 1988 1989 # DH_check will return DH_NOT_SUITABLE_GENERATOR if p % 24 does not 1990 # equal 11 when the generator is 2 (a quadratic nonresidue). 1991 # We want to ignore that error because p % 24 == 23 is also fine. 1992 # Specifically, g is then a quadratic residue. Within the context of 1993 # Diffie-Hellman this means it can only generate half the possible 1994 # values. That sounds bad, but quadratic nonresidues leak a bit of 1995 # the key to the attacker in exchange for having the full key space 1996 # available. See: https://crypto.stackexchange.com/questions/12961 1997 if codes[0] != 0 and not ( 1998 parameter_numbers.g == 2 and 1999 codes[0] ^ self._lib.DH_NOT_SUITABLE_GENERATOR == 0 2000 ): 2001 raise ValueError( 2002 "DH private numbers did not pass safety checks." 2003 ) 2004 2005 evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata) 2006 2007 return _DHPrivateKey(self, dh_cdata, evp_pkey) 2008 2009 def load_dh_public_numbers(self, numbers): 2010 dh_cdata = self._lib.DH_new() 2011 self.openssl_assert(dh_cdata != self._ffi.NULL) 2012 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 2013 2014 parameter_numbers = numbers.parameter_numbers 2015 2016 p = self._int_to_bn(parameter_numbers.p) 2017 g = self._int_to_bn(parameter_numbers.g) 2018 2019 if parameter_numbers.q is not None: 2020 q = self._int_to_bn(parameter_numbers.q) 2021 else: 2022 q = self._ffi.NULL 2023 2024 pub_key = self._int_to_bn(numbers.y) 2025 2026 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g) 2027 self.openssl_assert(res == 1) 2028 2029 res = self._lib.DH_set0_key(dh_cdata, pub_key, self._ffi.NULL) 2030 self.openssl_assert(res == 1) 2031 2032 evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata) 2033 2034 return _DHPublicKey(self, dh_cdata, evp_pkey) 2035 2036 def load_dh_parameter_numbers(self, numbers): 2037 dh_cdata = self._lib.DH_new() 2038 self.openssl_assert(dh_cdata != self._ffi.NULL) 2039 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 2040 2041 p = self._int_to_bn(numbers.p) 2042 g = self._int_to_bn(numbers.g) 2043 2044 if numbers.q is not None: 2045 q = self._int_to_bn(numbers.q) 2046 else: 2047 q = self._ffi.NULL 2048 2049 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g) 2050 self.openssl_assert(res == 1) 2051 2052 return _DHParameters(self, dh_cdata) 2053 2054 def dh_parameters_supported(self, p, g, q=None): 2055 dh_cdata = self._lib.DH_new() 2056 self.openssl_assert(dh_cdata != self._ffi.NULL) 2057 dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free) 2058 2059 p = self._int_to_bn(p) 2060 g = self._int_to_bn(g) 2061 2062 if q is not None: 2063 q = self._int_to_bn(q) 2064 else: 2065 q = self._ffi.NULL 2066 2067 res = self._lib.DH_set0_pqg(dh_cdata, p, q, g) 2068 self.openssl_assert(res == 1) 2069 2070 codes = self._ffi.new("int[]", 1) 2071 res = self._lib.Cryptography_DH_check(dh_cdata, codes) 2072 self.openssl_assert(res == 1) 2073 2074 return codes[0] == 0 2075 2076 def dh_x942_serialization_supported(self): 2077 return self._lib.Cryptography_HAS_EVP_PKEY_DHX == 1 2078 2079 def x509_name_bytes(self, name): 2080 x509_name = _encode_name_gc(self, name) 2081 pp = self._ffi.new("unsigned char **") 2082 res = self._lib.i2d_X509_NAME(x509_name, pp) 2083 self.openssl_assert(pp[0] != self._ffi.NULL) 2084 pp = self._ffi.gc( 2085 pp, lambda pointer: self._lib.OPENSSL_free(pointer[0]) 2086 ) 2087 self.openssl_assert(res > 0) 2088 return self._ffi.buffer(pp[0], res)[:] 2089 2090 def x25519_load_public_bytes(self, data): 2091 # When we drop support for CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 we can 2092 # switch this to EVP_PKEY_new_raw_public_key 2093 if len(data) != 32: 2094 raise ValueError("An X25519 public key is 32 bytes long") 2095 2096 evp_pkey = self._create_evp_pkey_gc() 2097 res = self._lib.EVP_PKEY_set_type(evp_pkey, self._lib.NID_X25519) 2098 backend.openssl_assert(res == 1) 2099 res = self._lib.EVP_PKEY_set1_tls_encodedpoint( 2100 evp_pkey, data, len(data) 2101 ) 2102 backend.openssl_assert(res == 1) 2103 return _X25519PublicKey(self, evp_pkey) 2104 2105 def x25519_load_private_bytes(self, data): 2106 # When we drop support for CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 we can 2107 # switch this to EVP_PKEY_new_raw_private_key and drop the 2108 # zeroed_bytearray garbage. 2109 # OpenSSL only has facilities for loading PKCS8 formatted private 2110 # keys using the algorithm identifiers specified in 2111 # https://tools.ietf.org/html/draft-ietf-curdle-pkix-09. 2112 # This is the standard PKCS8 prefix for a 32 byte X25519 key. 2113 # The form is: 2114 # 0:d=0 hl=2 l= 46 cons: SEQUENCE 2115 # 2:d=1 hl=2 l= 1 prim: INTEGER :00 2116 # 5:d=1 hl=2 l= 5 cons: SEQUENCE 2117 # 7:d=2 hl=2 l= 3 prim: OBJECT :1.3.101.110 2118 # 12:d=1 hl=2 l= 34 prim: OCTET STRING (the key) 2119 # Of course there's a bit more complexity. In reality OCTET STRING 2120 # contains an OCTET STRING of length 32! So the last two bytes here 2121 # are \x04\x20, which is an OCTET STRING of length 32. 2122 if len(data) != 32: 2123 raise ValueError("An X25519 private key is 32 bytes long") 2124 2125 pkcs8_prefix = b'0.\x02\x01\x000\x05\x06\x03+en\x04"\x04 ' 2126 with self._zeroed_bytearray(48) as ba: 2127 ba[0:16] = pkcs8_prefix 2128 ba[16:] = data 2129 bio = self._bytes_to_bio(ba) 2130 evp_pkey = backend._lib.d2i_PrivateKey_bio(bio.bio, self._ffi.NULL) 2131 2132 self.openssl_assert(evp_pkey != self._ffi.NULL) 2133 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 2134 self.openssl_assert( 2135 self._lib.EVP_PKEY_id(evp_pkey) == self._lib.EVP_PKEY_X25519 2136 ) 2137 return _X25519PrivateKey(self, evp_pkey) 2138 2139 def _evp_pkey_keygen_gc(self, nid): 2140 evp_pkey_ctx = self._lib.EVP_PKEY_CTX_new_id(nid, self._ffi.NULL) 2141 self.openssl_assert(evp_pkey_ctx != self._ffi.NULL) 2142 evp_pkey_ctx = self._ffi.gc(evp_pkey_ctx, self._lib.EVP_PKEY_CTX_free) 2143 res = self._lib.EVP_PKEY_keygen_init(evp_pkey_ctx) 2144 self.openssl_assert(res == 1) 2145 evp_ppkey = self._ffi.new("EVP_PKEY **") 2146 res = self._lib.EVP_PKEY_keygen(evp_pkey_ctx, evp_ppkey) 2147 self.openssl_assert(res == 1) 2148 self.openssl_assert(evp_ppkey[0] != self._ffi.NULL) 2149 evp_pkey = self._ffi.gc(evp_ppkey[0], self._lib.EVP_PKEY_free) 2150 return evp_pkey 2151 2152 def x25519_generate_key(self): 2153 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X25519) 2154 return _X25519PrivateKey(self, evp_pkey) 2155 2156 def x25519_supported(self): 2157 return self._lib.CRYPTOGRAPHY_OPENSSL_110_OR_GREATER 2158 2159 def x448_load_public_bytes(self, data): 2160 if len(data) != 56: 2161 raise ValueError("An X448 public key is 56 bytes long") 2162 2163 evp_pkey = self._lib.EVP_PKEY_new_raw_public_key( 2164 self._lib.NID_X448, self._ffi.NULL, data, len(data) 2165 ) 2166 self.openssl_assert(evp_pkey != self._ffi.NULL) 2167 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 2168 return _X448PublicKey(self, evp_pkey) 2169 2170 def x448_load_private_bytes(self, data): 2171 if len(data) != 56: 2172 raise ValueError("An X448 private key is 56 bytes long") 2173 2174 data_ptr = self._ffi.from_buffer(data) 2175 evp_pkey = self._lib.EVP_PKEY_new_raw_private_key( 2176 self._lib.NID_X448, self._ffi.NULL, data_ptr, len(data) 2177 ) 2178 self.openssl_assert(evp_pkey != self._ffi.NULL) 2179 evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free) 2180 return _X448PrivateKey(self, evp_pkey) 2181 2182 def x448_generate_key(self): 2183 evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X448) 2184 return _X448PrivateKey(self, evp_pkey) 2185 2186 def x448_supported(self): 2187 return not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 2188 2189 def derive_scrypt(self, key_material, salt, length, n, r, p): 2190 buf = self._ffi.new("unsigned char[]", length) 2191 key_material_ptr = self._ffi.from_buffer(key_material) 2192 res = self._lib.EVP_PBE_scrypt( 2193 key_material_ptr, len(key_material), salt, len(salt), n, r, p, 2194 scrypt._MEM_LIMIT, buf, length 2195 ) 2196 if res != 1: 2197 errors = self._consume_errors() 2198 if not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111: 2199 # This error is only added to the stack in 1.1.1+ 2200 self.openssl_assert( 2201 errors[0]._lib_reason_match( 2202 self._lib.ERR_LIB_EVP, 2203 self._lib.ERR_R_MALLOC_FAILURE 2204 ) or 2205 errors[0]._lib_reason_match( 2206 self._lib.ERR_LIB_EVP, 2207 self._lib.EVP_R_MEMORY_LIMIT_EXCEEDED 2208 ) 2209 ) 2210 2211 # memory required formula explained here: 2212 # https://blog.filippo.io/the-scrypt-parameters/ 2213 min_memory = 128 * n * r // (1024**2) 2214 raise MemoryError( 2215 "Not enough memory to derive key. These parameters require" 2216 " {} MB of memory.".format(min_memory) 2217 ) 2218 return self._ffi.buffer(buf)[:] 2219 2220 def aead_cipher_supported(self, cipher): 2221 cipher_name = aead._aead_cipher_name(cipher) 2222 return ( 2223 self._lib.EVP_get_cipherbyname(cipher_name) != self._ffi.NULL 2224 ) 2225 2226 @contextlib.contextmanager 2227 def _zeroed_bytearray(self, length): 2228 """ 2229 This method creates a bytearray, which we copy data into (hopefully 2230 also from a mutable buffer that can be dynamically erased!), and then 2231 zero when we're done. 2232 """ 2233 ba = bytearray(length) 2234 try: 2235 yield ba 2236 finally: 2237 self._zero_data(ba, length) 2238 2239 def _zero_data(self, data, length): 2240 # We clear things this way because at the moment we're not 2241 # sure of a better way that can guarantee it overwrites the 2242 # memory of a bytearray and doesn't just replace the underlying char *. 2243 for i in range(length): 2244 data[i] = 0 2245 2246 @contextlib.contextmanager 2247 def _zeroed_null_terminated_buf(self, data): 2248 """ 2249 This method takes bytes, which can be a bytestring or a mutable 2250 buffer like a bytearray, and yields a null-terminated version of that 2251 data. This is required because PKCS12_parse doesn't take a length with 2252 its password char * and ffi.from_buffer doesn't provide null 2253 termination. So, to support zeroing the data via bytearray we 2254 need to build this ridiculous construct that copies the memory, but 2255 zeroes it after use. 2256 """ 2257 if data is None: 2258 yield self._ffi.NULL 2259 else: 2260 data_len = len(data) 2261 buf = self._ffi.new("char[]", data_len + 1) 2262 self._ffi.memmove(buf, data, data_len) 2263 try: 2264 yield buf 2265 finally: 2266 # Cast to a uint8_t * so we can assign by integer 2267 self._zero_data(self._ffi.cast("uint8_t *", buf), data_len) 2268 2269 def load_key_and_certificates_from_pkcs12(self, data, password): 2270 if password is not None: 2271 utils._check_byteslike("password", password) 2272 2273 bio = self._bytes_to_bio(data) 2274 p12 = self._lib.d2i_PKCS12_bio(bio.bio, self._ffi.NULL) 2275 if p12 == self._ffi.NULL: 2276 self._consume_errors() 2277 raise ValueError("Could not deserialize PKCS12 data") 2278 2279 p12 = self._ffi.gc(p12, self._lib.PKCS12_free) 2280 evp_pkey_ptr = self._ffi.new("EVP_PKEY **") 2281 x509_ptr = self._ffi.new("X509 **") 2282 sk_x509_ptr = self._ffi.new("Cryptography_STACK_OF_X509 **") 2283 with self._zeroed_null_terminated_buf(password) as password_buf: 2284 res = self._lib.PKCS12_parse( 2285 p12, password_buf, evp_pkey_ptr, x509_ptr, sk_x509_ptr 2286 ) 2287 2288 if res == 0: 2289 self._consume_errors() 2290 raise ValueError("Invalid password or PKCS12 data") 2291 2292 cert = None 2293 key = None 2294 additional_certificates = [] 2295 2296 if evp_pkey_ptr[0] != self._ffi.NULL: 2297 evp_pkey = self._ffi.gc(evp_pkey_ptr[0], self._lib.EVP_PKEY_free) 2298 key = self._evp_pkey_to_private_key(evp_pkey) 2299 2300 if x509_ptr[0] != self._ffi.NULL: 2301 x509 = self._ffi.gc(x509_ptr[0], self._lib.X509_free) 2302 cert = _Certificate(self, x509) 2303 2304 if sk_x509_ptr[0] != self._ffi.NULL: 2305 sk_x509 = self._ffi.gc(sk_x509_ptr[0], self._lib.sk_X509_free) 2306 num = self._lib.sk_X509_num(sk_x509_ptr[0]) 2307 for i in range(num): 2308 x509 = self._lib.sk_X509_value(sk_x509, i) 2309 x509 = self._ffi.gc(x509, self._lib.X509_free) 2310 self.openssl_assert(x509 != self._ffi.NULL) 2311 additional_certificates.append(_Certificate(self, x509)) 2312 2313 return (key, cert, additional_certificates) 2314 2315 2316class GetCipherByName(object): 2317 def __init__(self, fmt): 2318 self._fmt = fmt 2319 2320 def __call__(self, backend, cipher, mode): 2321 cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower() 2322 return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii")) 2323 2324 2325def _get_xts_cipher(backend, cipher, mode): 2326 cipher_name = "aes-{0}-xts".format(cipher.key_size // 2) 2327 return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii")) 2328 2329 2330backend = Backend() 2331