• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import absolute_import, division, print_function
6
7import base64
8import collections
9import contextlib
10import itertools
11from contextlib import contextmanager
12
13import asn1crypto.core
14
15import six
16from six.moves import range
17
18from cryptography import utils, x509
19from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
20from cryptography.hazmat.backends.interfaces import (
21    CMACBackend, CipherBackend, DERSerializationBackend, DHBackend, DSABackend,
22    EllipticCurveBackend, HMACBackend, HashBackend, PBKDF2HMACBackend,
23    PEMSerializationBackend, RSABackend, ScryptBackend, X509Backend
24)
25from cryptography.hazmat.backends.openssl import aead
26from cryptography.hazmat.backends.openssl.ciphers import _CipherContext
27from cryptography.hazmat.backends.openssl.cmac import _CMACContext
28from cryptography.hazmat.backends.openssl.decode_asn1 import (
29    _CRL_ENTRY_REASON_ENUM_TO_CODE, _Integers
30)
31from cryptography.hazmat.backends.openssl.dh import (
32    _DHParameters, _DHPrivateKey, _DHPublicKey, _dh_params_dup
33)
34from cryptography.hazmat.backends.openssl.dsa import (
35    _DSAParameters, _DSAPrivateKey, _DSAPublicKey
36)
37from cryptography.hazmat.backends.openssl.ec import (
38    _EllipticCurvePrivateKey, _EllipticCurvePublicKey
39)
40from cryptography.hazmat.backends.openssl.encode_asn1 import (
41    _CRL_ENTRY_EXTENSION_ENCODE_HANDLERS,
42    _CRL_EXTENSION_ENCODE_HANDLERS, _EXTENSION_ENCODE_HANDLERS,
43    _OCSP_BASICRESP_EXTENSION_ENCODE_HANDLERS,
44    _OCSP_REQUEST_EXTENSION_ENCODE_HANDLERS,
45    _encode_asn1_int_gc, _encode_asn1_str_gc, _encode_name_gc, _txt2obj_gc,
46)
47from cryptography.hazmat.backends.openssl.hashes import _HashContext
48from cryptography.hazmat.backends.openssl.hmac import _HMACContext
49from cryptography.hazmat.backends.openssl.ocsp import (
50    _OCSPRequest, _OCSPResponse
51)
52from cryptography.hazmat.backends.openssl.rsa import (
53    _RSAPrivateKey, _RSAPublicKey
54)
55from cryptography.hazmat.backends.openssl.x25519 import (
56    _X25519PrivateKey, _X25519PublicKey
57)
58from cryptography.hazmat.backends.openssl.x448 import (
59    _X448PrivateKey, _X448PublicKey
60)
61from cryptography.hazmat.backends.openssl.x509 import (
62    _Certificate, _CertificateRevocationList,
63    _CertificateSigningRequest, _RevokedCertificate
64)
65from cryptography.hazmat.bindings.openssl import binding
66from cryptography.hazmat.primitives import hashes, serialization
67from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa
68from cryptography.hazmat.primitives.asymmetric.padding import (
69    MGF1, OAEP, PKCS1v15, PSS
70)
71from cryptography.hazmat.primitives.ciphers.algorithms import (
72    AES, ARC4, Blowfish, CAST5, Camellia, ChaCha20, IDEA, SEED, TripleDES
73)
74from cryptography.hazmat.primitives.ciphers.modes import (
75    CBC, CFB, CFB8, CTR, ECB, GCM, OFB, XTS
76)
77from cryptography.hazmat.primitives.kdf import scrypt
78from cryptography.hazmat.primitives.serialization import ssh
79from cryptography.x509 import ocsp
80
81
82_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"])
83
84
85@utils.register_interface(CipherBackend)
86@utils.register_interface(CMACBackend)
87@utils.register_interface(DERSerializationBackend)
88@utils.register_interface(DHBackend)
89@utils.register_interface(DSABackend)
90@utils.register_interface(EllipticCurveBackend)
91@utils.register_interface(HashBackend)
92@utils.register_interface(HMACBackend)
93@utils.register_interface(PBKDF2HMACBackend)
94@utils.register_interface(RSABackend)
95@utils.register_interface(PEMSerializationBackend)
96@utils.register_interface(X509Backend)
97@utils.register_interface_if(
98    binding.Binding().lib.Cryptography_HAS_SCRYPT, ScryptBackend
99)
100class Backend(object):
101    """
102    OpenSSL API binding interfaces.
103    """
104    name = "openssl"
105
106    def __init__(self):
107        self._binding = binding.Binding()
108        self._ffi = self._binding.ffi
109        self._lib = self._binding.lib
110
111        self._cipher_registry = {}
112        self._register_default_ciphers()
113        self.activate_osrandom_engine()
114        self._dh_types = [self._lib.EVP_PKEY_DH]
115        if self._lib.Cryptography_HAS_EVP_PKEY_DHX:
116            self._dh_types.append(self._lib.EVP_PKEY_DHX)
117
118    def openssl_assert(self, ok):
119        return binding._openssl_assert(self._lib, ok)
120
121    def activate_builtin_random(self):
122        # Obtain a new structural reference.
123        e = self._lib.ENGINE_get_default_RAND()
124        if e != self._ffi.NULL:
125            self._lib.ENGINE_unregister_RAND(e)
126            # Reset the RNG to use the new engine.
127            self._lib.RAND_cleanup()
128            # decrement the structural reference from get_default_RAND
129            res = self._lib.ENGINE_finish(e)
130            self.openssl_assert(res == 1)
131
132    @contextlib.contextmanager
133    def _get_osurandom_engine(self):
134        # Fetches an engine by id and returns it. This creates a structural
135        # reference.
136        e = self._lib.ENGINE_by_id(self._binding._osrandom_engine_id)
137        self.openssl_assert(e != self._ffi.NULL)
138        # Initialize the engine for use. This adds a functional reference.
139        res = self._lib.ENGINE_init(e)
140        self.openssl_assert(res == 1)
141
142        try:
143            yield e
144        finally:
145            # Decrement the structural ref incremented by ENGINE_by_id.
146            res = self._lib.ENGINE_free(e)
147            self.openssl_assert(res == 1)
148            # Decrement the functional ref incremented by ENGINE_init.
149            res = self._lib.ENGINE_finish(e)
150            self.openssl_assert(res == 1)
151
152    def activate_osrandom_engine(self):
153        # Unregister and free the current engine.
154        self.activate_builtin_random()
155        with self._get_osurandom_engine() as e:
156            # Set the engine as the default RAND provider.
157            res = self._lib.ENGINE_set_default_RAND(e)
158            self.openssl_assert(res == 1)
159        # Reset the RNG to use the new engine.
160        self._lib.RAND_cleanup()
161
162    def osrandom_engine_implementation(self):
163        buf = self._ffi.new("char[]", 64)
164        with self._get_osurandom_engine() as e:
165            res = self._lib.ENGINE_ctrl_cmd(e, b"get_implementation",
166                                            len(buf), buf,
167                                            self._ffi.NULL, 0)
168            self.openssl_assert(res > 0)
169        return self._ffi.string(buf).decode('ascii')
170
171    def openssl_version_text(self):
172        """
173        Friendly string name of the loaded OpenSSL library. This is not
174        necessarily the same version as it was compiled against.
175
176        Example: OpenSSL 1.0.1e 11 Feb 2013
177        """
178        return self._ffi.string(
179            self._lib.OpenSSL_version(self._lib.OPENSSL_VERSION)
180        ).decode("ascii")
181
182    def openssl_version_number(self):
183        return self._lib.OpenSSL_version_num()
184
185    def create_hmac_ctx(self, key, algorithm):
186        return _HMACContext(self, key, algorithm)
187
188    def _evp_md_from_algorithm(self, algorithm):
189        if algorithm.name == "blake2b" or algorithm.name == "blake2s":
190            alg = "{0}{1}".format(
191                algorithm.name, algorithm.digest_size * 8
192            ).encode("ascii")
193        else:
194            alg = algorithm.name.encode("ascii")
195
196        evp_md = self._lib.EVP_get_digestbyname(alg)
197        return evp_md
198
199    def _evp_md_non_null_from_algorithm(self, algorithm):
200        evp_md = self._evp_md_from_algorithm(algorithm)
201        self.openssl_assert(evp_md != self._ffi.NULL)
202        return evp_md
203
204    def hash_supported(self, algorithm):
205        evp_md = self._evp_md_from_algorithm(algorithm)
206        return evp_md != self._ffi.NULL
207
208    def hmac_supported(self, algorithm):
209        return self.hash_supported(algorithm)
210
211    def create_hash_ctx(self, algorithm):
212        return _HashContext(self, algorithm)
213
214    def cipher_supported(self, cipher, mode):
215        try:
216            adapter = self._cipher_registry[type(cipher), type(mode)]
217        except KeyError:
218            return False
219        evp_cipher = adapter(self, cipher, mode)
220        return self._ffi.NULL != evp_cipher
221
222    def register_cipher_adapter(self, cipher_cls, mode_cls, adapter):
223        if (cipher_cls, mode_cls) in self._cipher_registry:
224            raise ValueError("Duplicate registration for: {0} {1}.".format(
225                cipher_cls, mode_cls)
226            )
227        self._cipher_registry[cipher_cls, mode_cls] = adapter
228
229    def _register_default_ciphers(self):
230        for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]:
231            self.register_cipher_adapter(
232                AES,
233                mode_cls,
234                GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}")
235            )
236        for mode_cls in [CBC, CTR, ECB, OFB, CFB]:
237            self.register_cipher_adapter(
238                Camellia,
239                mode_cls,
240                GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}")
241            )
242        for mode_cls in [CBC, CFB, CFB8, OFB]:
243            self.register_cipher_adapter(
244                TripleDES,
245                mode_cls,
246                GetCipherByName("des-ede3-{mode.name}")
247            )
248        self.register_cipher_adapter(
249            TripleDES,
250            ECB,
251            GetCipherByName("des-ede3")
252        )
253        for mode_cls in [CBC, CFB, OFB, ECB]:
254            self.register_cipher_adapter(
255                Blowfish,
256                mode_cls,
257                GetCipherByName("bf-{mode.name}")
258            )
259        for mode_cls in [CBC, CFB, OFB, ECB]:
260            self.register_cipher_adapter(
261                SEED,
262                mode_cls,
263                GetCipherByName("seed-{mode.name}")
264            )
265        for cipher_cls, mode_cls in itertools.product(
266            [CAST5, IDEA],
267            [CBC, OFB, CFB, ECB],
268        ):
269            self.register_cipher_adapter(
270                cipher_cls,
271                mode_cls,
272                GetCipherByName("{cipher.name}-{mode.name}")
273            )
274        self.register_cipher_adapter(
275            ARC4,
276            type(None),
277            GetCipherByName("rc4")
278        )
279        self.register_cipher_adapter(
280            ChaCha20,
281            type(None),
282            GetCipherByName("chacha20")
283        )
284        self.register_cipher_adapter(AES, XTS, _get_xts_cipher)
285
286    def create_symmetric_encryption_ctx(self, cipher, mode):
287        return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT)
288
289    def create_symmetric_decryption_ctx(self, cipher, mode):
290        return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT)
291
292    def pbkdf2_hmac_supported(self, algorithm):
293        return self.hmac_supported(algorithm)
294
295    def derive_pbkdf2_hmac(self, algorithm, length, salt, iterations,
296                           key_material):
297        buf = self._ffi.new("unsigned char[]", length)
298        evp_md = self._evp_md_non_null_from_algorithm(algorithm)
299        key_material_ptr = self._ffi.from_buffer(key_material)
300        res = self._lib.PKCS5_PBKDF2_HMAC(
301            key_material_ptr,
302            len(key_material),
303            salt,
304            len(salt),
305            iterations,
306            evp_md,
307            length,
308            buf
309        )
310        self.openssl_assert(res == 1)
311        return self._ffi.buffer(buf)[:]
312
313    def _consume_errors(self):
314        return binding._consume_errors(self._lib)
315
316    def _bn_to_int(self, bn):
317        assert bn != self._ffi.NULL
318
319        if not six.PY2:
320            # Python 3 has constant time from_bytes, so use that.
321            bn_num_bytes = self._lib.BN_num_bytes(bn)
322            bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes)
323            bin_len = self._lib.BN_bn2bin(bn, bin_ptr)
324            # A zero length means the BN has value 0
325            self.openssl_assert(bin_len >= 0)
326            return int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big")
327        else:
328            # Under Python 2 the best we can do is hex()
329            hex_cdata = self._lib.BN_bn2hex(bn)
330            self.openssl_assert(hex_cdata != self._ffi.NULL)
331            hex_str = self._ffi.string(hex_cdata)
332            self._lib.OPENSSL_free(hex_cdata)
333            return int(hex_str, 16)
334
335    def _int_to_bn(self, num, bn=None):
336        """
337        Converts a python integer to a BIGNUM. The returned BIGNUM will not
338        be garbage collected (to support adding them to structs that take
339        ownership of the object). Be sure to register it for GC if it will
340        be discarded after use.
341        """
342        assert bn is None or bn != self._ffi.NULL
343
344        if bn is None:
345            bn = self._ffi.NULL
346
347        if not six.PY2:
348            # Python 3 has constant time to_bytes, so use that.
349
350            binary = num.to_bytes(int(num.bit_length() / 8.0 + 1), "big")
351            bn_ptr = self._lib.BN_bin2bn(binary, len(binary), bn)
352            self.openssl_assert(bn_ptr != self._ffi.NULL)
353            return bn_ptr
354
355        else:
356            # Under Python 2 the best we can do is hex(), [2:] removes the 0x
357            # prefix.
358            hex_num = hex(num).rstrip("L")[2:].encode("ascii")
359            bn_ptr = self._ffi.new("BIGNUM **")
360            bn_ptr[0] = bn
361            res = self._lib.BN_hex2bn(bn_ptr, hex_num)
362            self.openssl_assert(res != 0)
363            self.openssl_assert(bn_ptr[0] != self._ffi.NULL)
364            return bn_ptr[0]
365
366    def generate_rsa_private_key(self, public_exponent, key_size):
367        rsa._verify_rsa_parameters(public_exponent, key_size)
368
369        rsa_cdata = self._lib.RSA_new()
370        self.openssl_assert(rsa_cdata != self._ffi.NULL)
371        rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
372
373        bn = self._int_to_bn(public_exponent)
374        bn = self._ffi.gc(bn, self._lib.BN_free)
375
376        res = self._lib.RSA_generate_key_ex(
377            rsa_cdata, key_size, bn, self._ffi.NULL
378        )
379        self.openssl_assert(res == 1)
380        evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
381
382        return _RSAPrivateKey(self, rsa_cdata, evp_pkey)
383
384    def generate_rsa_parameters_supported(self, public_exponent, key_size):
385        return (public_exponent >= 3 and public_exponent & 1 != 0 and
386                key_size >= 512)
387
388    def load_rsa_private_numbers(self, numbers):
389        rsa._check_private_key_components(
390            numbers.p,
391            numbers.q,
392            numbers.d,
393            numbers.dmp1,
394            numbers.dmq1,
395            numbers.iqmp,
396            numbers.public_numbers.e,
397            numbers.public_numbers.n
398        )
399        rsa_cdata = self._lib.RSA_new()
400        self.openssl_assert(rsa_cdata != self._ffi.NULL)
401        rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
402        p = self._int_to_bn(numbers.p)
403        q = self._int_to_bn(numbers.q)
404        d = self._int_to_bn(numbers.d)
405        dmp1 = self._int_to_bn(numbers.dmp1)
406        dmq1 = self._int_to_bn(numbers.dmq1)
407        iqmp = self._int_to_bn(numbers.iqmp)
408        e = self._int_to_bn(numbers.public_numbers.e)
409        n = self._int_to_bn(numbers.public_numbers.n)
410        res = self._lib.RSA_set0_factors(rsa_cdata, p, q)
411        self.openssl_assert(res == 1)
412        res = self._lib.RSA_set0_key(rsa_cdata, n, e, d)
413        self.openssl_assert(res == 1)
414        res = self._lib.RSA_set0_crt_params(rsa_cdata, dmp1, dmq1, iqmp)
415        self.openssl_assert(res == 1)
416        res = self._lib.RSA_blinding_on(rsa_cdata, self._ffi.NULL)
417        self.openssl_assert(res == 1)
418        evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
419
420        return _RSAPrivateKey(self, rsa_cdata, evp_pkey)
421
422    def load_rsa_public_numbers(self, numbers):
423        rsa._check_public_key_components(numbers.e, numbers.n)
424        rsa_cdata = self._lib.RSA_new()
425        self.openssl_assert(rsa_cdata != self._ffi.NULL)
426        rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
427        e = self._int_to_bn(numbers.e)
428        n = self._int_to_bn(numbers.n)
429        res = self._lib.RSA_set0_key(rsa_cdata, n, e, self._ffi.NULL)
430        self.openssl_assert(res == 1)
431        evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
432
433        return _RSAPublicKey(self, rsa_cdata, evp_pkey)
434
435    def _create_evp_pkey_gc(self):
436        evp_pkey = self._lib.EVP_PKEY_new()
437        self.openssl_assert(evp_pkey != self._ffi.NULL)
438        evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
439        return evp_pkey
440
441    def _rsa_cdata_to_evp_pkey(self, rsa_cdata):
442        evp_pkey = self._create_evp_pkey_gc()
443        res = self._lib.EVP_PKEY_set1_RSA(evp_pkey, rsa_cdata)
444        self.openssl_assert(res == 1)
445        return evp_pkey
446
447    def _bytes_to_bio(self, data):
448        """
449        Return a _MemoryBIO namedtuple of (BIO, char*).
450
451        The char* is the storage for the BIO and it must stay alive until the
452        BIO is finished with.
453        """
454        data_ptr = self._ffi.from_buffer(data)
455        bio = self._lib.BIO_new_mem_buf(
456            data_ptr, len(data)
457        )
458        self.openssl_assert(bio != self._ffi.NULL)
459
460        return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_ptr)
461
462    def _create_mem_bio_gc(self):
463        """
464        Creates an empty memory BIO.
465        """
466        bio_method = self._lib.BIO_s_mem()
467        self.openssl_assert(bio_method != self._ffi.NULL)
468        bio = self._lib.BIO_new(bio_method)
469        self.openssl_assert(bio != self._ffi.NULL)
470        bio = self._ffi.gc(bio, self._lib.BIO_free)
471        return bio
472
473    def _read_mem_bio(self, bio):
474        """
475        Reads a memory BIO. This only works on memory BIOs.
476        """
477        buf = self._ffi.new("char **")
478        buf_len = self._lib.BIO_get_mem_data(bio, buf)
479        self.openssl_assert(buf_len > 0)
480        self.openssl_assert(buf[0] != self._ffi.NULL)
481        bio_data = self._ffi.buffer(buf[0], buf_len)[:]
482        return bio_data
483
484    def _evp_pkey_to_private_key(self, evp_pkey):
485        """
486        Return the appropriate type of PrivateKey given an evp_pkey cdata
487        pointer.
488        """
489
490        key_type = self._lib.EVP_PKEY_id(evp_pkey)
491
492        if key_type == self._lib.EVP_PKEY_RSA:
493            rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
494            self.openssl_assert(rsa_cdata != self._ffi.NULL)
495            rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
496            return _RSAPrivateKey(self, rsa_cdata, evp_pkey)
497        elif key_type == self._lib.EVP_PKEY_DSA:
498            dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
499            self.openssl_assert(dsa_cdata != self._ffi.NULL)
500            dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
501            return _DSAPrivateKey(self, dsa_cdata, evp_pkey)
502        elif key_type == self._lib.EVP_PKEY_EC:
503            ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
504            self.openssl_assert(ec_cdata != self._ffi.NULL)
505            ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
506            return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
507        elif key_type in self._dh_types:
508            dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey)
509            self.openssl_assert(dh_cdata != self._ffi.NULL)
510            dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
511            return _DHPrivateKey(self, dh_cdata, evp_pkey)
512        elif key_type == getattr(self._lib, "EVP_PKEY_X448", None):
513            # EVP_PKEY_X448 is not present in OpenSSL < 1.1.1
514            return _X448PrivateKey(self, evp_pkey)
515        elif key_type == getattr(self._lib, "EVP_PKEY_X25519", None):
516            # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.0
517            return _X25519PrivateKey(self, evp_pkey)
518        else:
519            raise UnsupportedAlgorithm("Unsupported key type.")
520
521    def _evp_pkey_to_public_key(self, evp_pkey):
522        """
523        Return the appropriate type of PublicKey given an evp_pkey cdata
524        pointer.
525        """
526
527        key_type = self._lib.EVP_PKEY_id(evp_pkey)
528
529        if key_type == self._lib.EVP_PKEY_RSA:
530            rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
531            self.openssl_assert(rsa_cdata != self._ffi.NULL)
532            rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
533            return _RSAPublicKey(self, rsa_cdata, evp_pkey)
534        elif key_type == self._lib.EVP_PKEY_DSA:
535            dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
536            self.openssl_assert(dsa_cdata != self._ffi.NULL)
537            dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
538            return _DSAPublicKey(self, dsa_cdata, evp_pkey)
539        elif key_type == self._lib.EVP_PKEY_EC:
540            ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
541            self.openssl_assert(ec_cdata != self._ffi.NULL)
542            ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
543            return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
544        elif key_type in self._dh_types:
545            dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey)
546            self.openssl_assert(dh_cdata != self._ffi.NULL)
547            dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
548            return _DHPublicKey(self, dh_cdata, evp_pkey)
549        elif key_type == getattr(self._lib, "EVP_PKEY_X448", None):
550            # EVP_PKEY_X448 is not present in OpenSSL < 1.1.1
551            return _X448PublicKey(self, evp_pkey)
552        elif key_type == getattr(self._lib, "EVP_PKEY_X25519", None):
553            # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.0
554            return _X25519PublicKey(self, evp_pkey)
555        else:
556            raise UnsupportedAlgorithm("Unsupported key type.")
557
558    def _oaep_hash_supported(self, algorithm):
559        if self._lib.Cryptography_HAS_RSA_OAEP_MD:
560            return isinstance(
561                algorithm, (
562                    hashes.SHA1,
563                    hashes.SHA224,
564                    hashes.SHA256,
565                    hashes.SHA384,
566                    hashes.SHA512,
567                )
568            )
569        else:
570            return isinstance(algorithm, hashes.SHA1)
571
572    def rsa_padding_supported(self, padding):
573        if isinstance(padding, PKCS1v15):
574            return True
575        elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1):
576            return self.hash_supported(padding._mgf._algorithm)
577        elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1):
578            return (
579                self._oaep_hash_supported(padding._mgf._algorithm) and
580                self._oaep_hash_supported(padding._algorithm) and
581                (
582                    (padding._label is None or len(padding._label) == 0) or
583                    self._lib.Cryptography_HAS_RSA_OAEP_LABEL == 1
584                )
585            )
586        else:
587            return False
588
589    def generate_dsa_parameters(self, key_size):
590        if key_size not in (1024, 2048, 3072):
591            raise ValueError("Key size must be 1024 or 2048 or 3072 bits.")
592
593        ctx = self._lib.DSA_new()
594        self.openssl_assert(ctx != self._ffi.NULL)
595        ctx = self._ffi.gc(ctx, self._lib.DSA_free)
596
597        res = self._lib.DSA_generate_parameters_ex(
598            ctx, key_size, self._ffi.NULL, 0,
599            self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
600        )
601
602        self.openssl_assert(res == 1)
603
604        return _DSAParameters(self, ctx)
605
606    def generate_dsa_private_key(self, parameters):
607        ctx = self._lib.DSAparams_dup(parameters._dsa_cdata)
608        self.openssl_assert(ctx != self._ffi.NULL)
609        ctx = self._ffi.gc(ctx, self._lib.DSA_free)
610        self._lib.DSA_generate_key(ctx)
611        evp_pkey = self._dsa_cdata_to_evp_pkey(ctx)
612
613        return _DSAPrivateKey(self, ctx, evp_pkey)
614
615    def generate_dsa_private_key_and_parameters(self, key_size):
616        parameters = self.generate_dsa_parameters(key_size)
617        return self.generate_dsa_private_key(parameters)
618
619    def _dsa_cdata_set_values(self, dsa_cdata, p, q, g, pub_key, priv_key):
620        res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g)
621        self.openssl_assert(res == 1)
622        res = self._lib.DSA_set0_key(dsa_cdata, pub_key, priv_key)
623        self.openssl_assert(res == 1)
624
625    def load_dsa_private_numbers(self, numbers):
626        dsa._check_dsa_private_numbers(numbers)
627        parameter_numbers = numbers.public_numbers.parameter_numbers
628
629        dsa_cdata = self._lib.DSA_new()
630        self.openssl_assert(dsa_cdata != self._ffi.NULL)
631        dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
632
633        p = self._int_to_bn(parameter_numbers.p)
634        q = self._int_to_bn(parameter_numbers.q)
635        g = self._int_to_bn(parameter_numbers.g)
636        pub_key = self._int_to_bn(numbers.public_numbers.y)
637        priv_key = self._int_to_bn(numbers.x)
638        self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key)
639
640        evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata)
641
642        return _DSAPrivateKey(self, dsa_cdata, evp_pkey)
643
644    def load_dsa_public_numbers(self, numbers):
645        dsa._check_dsa_parameters(numbers.parameter_numbers)
646        dsa_cdata = self._lib.DSA_new()
647        self.openssl_assert(dsa_cdata != self._ffi.NULL)
648        dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
649
650        p = self._int_to_bn(numbers.parameter_numbers.p)
651        q = self._int_to_bn(numbers.parameter_numbers.q)
652        g = self._int_to_bn(numbers.parameter_numbers.g)
653        pub_key = self._int_to_bn(numbers.y)
654        priv_key = self._ffi.NULL
655        self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key)
656
657        evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata)
658
659        return _DSAPublicKey(self, dsa_cdata, evp_pkey)
660
661    def load_dsa_parameter_numbers(self, numbers):
662        dsa._check_dsa_parameters(numbers)
663        dsa_cdata = self._lib.DSA_new()
664        self.openssl_assert(dsa_cdata != self._ffi.NULL)
665        dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
666
667        p = self._int_to_bn(numbers.p)
668        q = self._int_to_bn(numbers.q)
669        g = self._int_to_bn(numbers.g)
670        res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g)
671        self.openssl_assert(res == 1)
672
673        return _DSAParameters(self, dsa_cdata)
674
675    def _dsa_cdata_to_evp_pkey(self, dsa_cdata):
676        evp_pkey = self._create_evp_pkey_gc()
677        res = self._lib.EVP_PKEY_set1_DSA(evp_pkey, dsa_cdata)
678        self.openssl_assert(res == 1)
679        return evp_pkey
680
681    def dsa_hash_supported(self, algorithm):
682        return self.hash_supported(algorithm)
683
684    def dsa_parameters_supported(self, p, q, g):
685        return True
686
687    def cmac_algorithm_supported(self, algorithm):
688        return self.cipher_supported(
689            algorithm, CBC(b"\x00" * algorithm.block_size)
690        )
691
692    def create_cmac_ctx(self, algorithm):
693        return _CMACContext(self, algorithm)
694
695    def create_x509_csr(self, builder, private_key, algorithm):
696        if not isinstance(algorithm, hashes.HashAlgorithm):
697            raise TypeError('Algorithm must be a registered hash algorithm.')
698
699        if (
700            isinstance(algorithm, hashes.MD5) and not
701            isinstance(private_key, rsa.RSAPrivateKey)
702        ):
703            raise ValueError(
704                "MD5 is not a supported hash algorithm for EC/DSA CSRs"
705            )
706
707        # Resolve the signature algorithm.
708        evp_md = self._evp_md_non_null_from_algorithm(algorithm)
709
710        # Create an empty request.
711        x509_req = self._lib.X509_REQ_new()
712        self.openssl_assert(x509_req != self._ffi.NULL)
713        x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free)
714
715        # Set x509 version.
716        res = self._lib.X509_REQ_set_version(x509_req, x509.Version.v1.value)
717        self.openssl_assert(res == 1)
718
719        # Set subject name.
720        res = self._lib.X509_REQ_set_subject_name(
721            x509_req, _encode_name_gc(self, builder._subject_name)
722        )
723        self.openssl_assert(res == 1)
724
725        # Set subject public key.
726        public_key = private_key.public_key()
727        res = self._lib.X509_REQ_set_pubkey(
728            x509_req, public_key._evp_pkey
729        )
730        self.openssl_assert(res == 1)
731
732        # Add extensions.
733        sk_extension = self._lib.sk_X509_EXTENSION_new_null()
734        self.openssl_assert(sk_extension != self._ffi.NULL)
735        sk_extension = self._ffi.gc(
736            sk_extension,
737            lambda x: self._lib.sk_X509_EXTENSION_pop_free(
738                x, self._ffi.addressof(
739                    self._lib._original_lib, "X509_EXTENSION_free"
740                )
741            )
742        )
743        # Don't GC individual extensions because the memory is owned by
744        # sk_extensions and will be freed along with it.
745        self._create_x509_extensions(
746            extensions=builder._extensions,
747            handlers=_EXTENSION_ENCODE_HANDLERS,
748            x509_obj=sk_extension,
749            add_func=self._lib.sk_X509_EXTENSION_insert,
750            gc=False
751        )
752        res = self._lib.X509_REQ_add_extensions(x509_req, sk_extension)
753        self.openssl_assert(res == 1)
754
755        # Sign the request using the requester's private key.
756        res = self._lib.X509_REQ_sign(
757            x509_req, private_key._evp_pkey, evp_md
758        )
759        if res == 0:
760            errors = self._consume_errors()
761            self.openssl_assert(
762                errors[0]._lib_reason_match(
763                    self._lib.ERR_LIB_RSA,
764                    self._lib.RSA_R_DIGEST_TOO_BIG_FOR_RSA_KEY
765                )
766            )
767
768            raise ValueError("Digest too big for RSA key")
769
770        return _CertificateSigningRequest(self, x509_req)
771
772    def create_x509_certificate(self, builder, private_key, algorithm):
773        if not isinstance(builder, x509.CertificateBuilder):
774            raise TypeError('Builder type mismatch.')
775        if not isinstance(algorithm, hashes.HashAlgorithm):
776            raise TypeError('Algorithm must be a registered hash algorithm.')
777
778        if (
779            isinstance(algorithm, hashes.MD5) and not
780            isinstance(private_key, rsa.RSAPrivateKey)
781        ):
782            raise ValueError(
783                "MD5 is not a supported hash algorithm for EC/DSA certificates"
784            )
785
786        # Resolve the signature algorithm.
787        evp_md = self._evp_md_non_null_from_algorithm(algorithm)
788
789        # Create an empty certificate.
790        x509_cert = self._lib.X509_new()
791        x509_cert = self._ffi.gc(x509_cert, backend._lib.X509_free)
792
793        # Set the x509 version.
794        res = self._lib.X509_set_version(x509_cert, builder._version.value)
795        self.openssl_assert(res == 1)
796
797        # Set the subject's name.
798        res = self._lib.X509_set_subject_name(
799            x509_cert, _encode_name_gc(self, builder._subject_name)
800        )
801        self.openssl_assert(res == 1)
802
803        # Set the subject's public key.
804        res = self._lib.X509_set_pubkey(
805            x509_cert, builder._public_key._evp_pkey
806        )
807        self.openssl_assert(res == 1)
808
809        # Set the certificate serial number.
810        serial_number = _encode_asn1_int_gc(self, builder._serial_number)
811        res = self._lib.X509_set_serialNumber(x509_cert, serial_number)
812        self.openssl_assert(res == 1)
813
814        # Set the "not before" time.
815        self._set_asn1_time(
816            self._lib.X509_get_notBefore(x509_cert), builder._not_valid_before
817        )
818
819        # Set the "not after" time.
820        self._set_asn1_time(
821            self._lib.X509_get_notAfter(x509_cert), builder._not_valid_after
822        )
823
824        # Add extensions.
825        self._create_x509_extensions(
826            extensions=builder._extensions,
827            handlers=_EXTENSION_ENCODE_HANDLERS,
828            x509_obj=x509_cert,
829            add_func=self._lib.X509_add_ext,
830            gc=True
831        )
832
833        # Set the issuer name.
834        res = self._lib.X509_set_issuer_name(
835            x509_cert, _encode_name_gc(self, builder._issuer_name)
836        )
837        self.openssl_assert(res == 1)
838
839        # Sign the certificate with the issuer's private key.
840        res = self._lib.X509_sign(
841            x509_cert, private_key._evp_pkey, evp_md
842        )
843        if res == 0:
844            errors = self._consume_errors()
845            self.openssl_assert(
846                errors[0]._lib_reason_match(
847                    self._lib.ERR_LIB_RSA,
848                    self._lib.RSA_R_DIGEST_TOO_BIG_FOR_RSA_KEY
849                )
850            )
851            raise ValueError("Digest too big for RSA key")
852
853        return _Certificate(self, x509_cert)
854
855    def _set_asn1_time(self, asn1_time, time):
856        if time.year >= 2050:
857            asn1_str = time.strftime('%Y%m%d%H%M%SZ').encode('ascii')
858        else:
859            asn1_str = time.strftime('%y%m%d%H%M%SZ').encode('ascii')
860        res = self._lib.ASN1_TIME_set_string(asn1_time, asn1_str)
861        self.openssl_assert(res == 1)
862
863    def _create_asn1_time(self, time):
864        asn1_time = self._lib.ASN1_TIME_new()
865        self.openssl_assert(asn1_time != self._ffi.NULL)
866        asn1_time = self._ffi.gc(asn1_time, self._lib.ASN1_TIME_free)
867        self._set_asn1_time(asn1_time, time)
868        return asn1_time
869
870    def create_x509_crl(self, builder, private_key, algorithm):
871        if not isinstance(builder, x509.CertificateRevocationListBuilder):
872            raise TypeError('Builder type mismatch.')
873        if not isinstance(algorithm, hashes.HashAlgorithm):
874            raise TypeError('Algorithm must be a registered hash algorithm.')
875
876        if (
877            isinstance(algorithm, hashes.MD5) and not
878            isinstance(private_key, rsa.RSAPrivateKey)
879        ):
880            raise ValueError(
881                "MD5 is not a supported hash algorithm for EC/DSA CRLs"
882            )
883
884        evp_md = self._evp_md_non_null_from_algorithm(algorithm)
885
886        # Create an empty CRL.
887        x509_crl = self._lib.X509_CRL_new()
888        x509_crl = self._ffi.gc(x509_crl, backend._lib.X509_CRL_free)
889
890        # Set the x509 CRL version. We only support v2 (integer value 1).
891        res = self._lib.X509_CRL_set_version(x509_crl, 1)
892        self.openssl_assert(res == 1)
893
894        # Set the issuer name.
895        res = self._lib.X509_CRL_set_issuer_name(
896            x509_crl, _encode_name_gc(self, builder._issuer_name)
897        )
898        self.openssl_assert(res == 1)
899
900        # Set the last update time.
901        last_update = self._create_asn1_time(builder._last_update)
902        res = self._lib.X509_CRL_set_lastUpdate(x509_crl, last_update)
903        self.openssl_assert(res == 1)
904
905        # Set the next update time.
906        next_update = self._create_asn1_time(builder._next_update)
907        res = self._lib.X509_CRL_set_nextUpdate(x509_crl, next_update)
908        self.openssl_assert(res == 1)
909
910        # Add extensions.
911        self._create_x509_extensions(
912            extensions=builder._extensions,
913            handlers=_CRL_EXTENSION_ENCODE_HANDLERS,
914            x509_obj=x509_crl,
915            add_func=self._lib.X509_CRL_add_ext,
916            gc=True
917        )
918
919        # add revoked certificates
920        for revoked_cert in builder._revoked_certificates:
921            # Duplicating because the X509_CRL takes ownership and will free
922            # this memory when X509_CRL_free is called.
923            revoked = self._lib.Cryptography_X509_REVOKED_dup(
924                revoked_cert._x509_revoked
925            )
926            self.openssl_assert(revoked != self._ffi.NULL)
927            res = self._lib.X509_CRL_add0_revoked(x509_crl, revoked)
928            self.openssl_assert(res == 1)
929
930        res = self._lib.X509_CRL_sign(
931            x509_crl, private_key._evp_pkey, evp_md
932        )
933        if res == 0:
934            errors = self._consume_errors()
935            self.openssl_assert(
936                errors[0]._lib_reason_match(
937                    self._lib.ERR_LIB_RSA,
938                    self._lib.RSA_R_DIGEST_TOO_BIG_FOR_RSA_KEY
939                )
940            )
941            raise ValueError("Digest too big for RSA key")
942
943        return _CertificateRevocationList(self, x509_crl)
944
945    def _create_x509_extensions(self, extensions, handlers, x509_obj,
946                                add_func, gc):
947        for i, extension in enumerate(extensions):
948            x509_extension = self._create_x509_extension(
949                handlers, extension
950            )
951            self.openssl_assert(x509_extension != self._ffi.NULL)
952
953            if gc:
954                x509_extension = self._ffi.gc(
955                    x509_extension, self._lib.X509_EXTENSION_free
956                )
957            res = add_func(x509_obj, x509_extension, i)
958            self.openssl_assert(res >= 1)
959
960    def _create_raw_x509_extension(self, extension, value):
961        obj = _txt2obj_gc(self, extension.oid.dotted_string)
962        return self._lib.X509_EXTENSION_create_by_OBJ(
963            self._ffi.NULL, obj, 1 if extension.critical else 0, value
964        )
965
966    def _create_x509_extension(self, handlers, extension):
967        if isinstance(extension.value, x509.UnrecognizedExtension):
968            value = _encode_asn1_str_gc(self, extension.value.value)
969            return self._create_raw_x509_extension(extension, value)
970        elif isinstance(extension.value, x509.TLSFeature):
971            asn1 = _Integers([x.value for x in extension.value]).dump()
972            value = _encode_asn1_str_gc(self, asn1)
973            return self._create_raw_x509_extension(extension, value)
974        elif isinstance(extension.value, x509.PrecertPoison):
975            asn1 = asn1crypto.core.Null().dump()
976            value = _encode_asn1_str_gc(self, asn1)
977            return self._create_raw_x509_extension(extension, value)
978        else:
979            try:
980                encode = handlers[extension.oid]
981            except KeyError:
982                raise NotImplementedError(
983                    'Extension not supported: {0}'.format(extension.oid)
984                )
985
986            ext_struct = encode(self, extension.value)
987            nid = self._lib.OBJ_txt2nid(
988                extension.oid.dotted_string.encode("ascii")
989            )
990            backend.openssl_assert(nid != self._lib.NID_undef)
991            return self._lib.X509V3_EXT_i2d(
992                nid, 1 if extension.critical else 0, ext_struct
993            )
994
995    def create_x509_revoked_certificate(self, builder):
996        if not isinstance(builder, x509.RevokedCertificateBuilder):
997            raise TypeError('Builder type mismatch.')
998
999        x509_revoked = self._lib.X509_REVOKED_new()
1000        self.openssl_assert(x509_revoked != self._ffi.NULL)
1001        x509_revoked = self._ffi.gc(x509_revoked, self._lib.X509_REVOKED_free)
1002        serial_number = _encode_asn1_int_gc(self, builder._serial_number)
1003        res = self._lib.X509_REVOKED_set_serialNumber(
1004            x509_revoked, serial_number
1005        )
1006        self.openssl_assert(res == 1)
1007        rev_date = self._create_asn1_time(builder._revocation_date)
1008        res = self._lib.X509_REVOKED_set_revocationDate(x509_revoked, rev_date)
1009        self.openssl_assert(res == 1)
1010        # add CRL entry extensions
1011        self._create_x509_extensions(
1012            extensions=builder._extensions,
1013            handlers=_CRL_ENTRY_EXTENSION_ENCODE_HANDLERS,
1014            x509_obj=x509_revoked,
1015            add_func=self._lib.X509_REVOKED_add_ext,
1016            gc=True
1017        )
1018        return _RevokedCertificate(self, None, x509_revoked)
1019
1020    def load_pem_private_key(self, data, password):
1021        return self._load_key(
1022            self._lib.PEM_read_bio_PrivateKey,
1023            self._evp_pkey_to_private_key,
1024            data,
1025            password,
1026        )
1027
1028    def load_pem_public_key(self, data):
1029        mem_bio = self._bytes_to_bio(data)
1030        evp_pkey = self._lib.PEM_read_bio_PUBKEY(
1031            mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
1032        )
1033        if evp_pkey != self._ffi.NULL:
1034            evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1035            return self._evp_pkey_to_public_key(evp_pkey)
1036        else:
1037            # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still
1038            # need to check to see if it is a pure PKCS1 RSA public key (not
1039            # embedded in a subjectPublicKeyInfo)
1040            self._consume_errors()
1041            res = self._lib.BIO_reset(mem_bio.bio)
1042            self.openssl_assert(res == 1)
1043            rsa_cdata = self._lib.PEM_read_bio_RSAPublicKey(
1044                mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
1045            )
1046            if rsa_cdata != self._ffi.NULL:
1047                rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
1048                evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
1049                return _RSAPublicKey(self, rsa_cdata, evp_pkey)
1050            else:
1051                self._handle_key_loading_error()
1052
1053    def load_pem_parameters(self, data):
1054        mem_bio = self._bytes_to_bio(data)
1055        # only DH is supported currently
1056        dh_cdata = self._lib.PEM_read_bio_DHparams(
1057            mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL)
1058        if dh_cdata != self._ffi.NULL:
1059            dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1060            return _DHParameters(self, dh_cdata)
1061        else:
1062            self._handle_key_loading_error()
1063
1064    def load_der_private_key(self, data, password):
1065        # OpenSSL has a function called d2i_AutoPrivateKey that in theory
1066        # handles this automatically, however it doesn't handle encrypted
1067        # private keys. Instead we try to load the key two different ways.
1068        # First we'll try to load it as a traditional key.
1069        bio_data = self._bytes_to_bio(data)
1070        key = self._evp_pkey_from_der_traditional_key(bio_data, password)
1071        if key:
1072            return self._evp_pkey_to_private_key(key)
1073        else:
1074            # Finally we try to load it with the method that handles encrypted
1075            # PKCS8 properly.
1076            return self._load_key(
1077                self._lib.d2i_PKCS8PrivateKey_bio,
1078                self._evp_pkey_to_private_key,
1079                data,
1080                password,
1081            )
1082
1083    def _evp_pkey_from_der_traditional_key(self, bio_data, password):
1084        key = self._lib.d2i_PrivateKey_bio(bio_data.bio, self._ffi.NULL)
1085        if key != self._ffi.NULL:
1086            key = self._ffi.gc(key, self._lib.EVP_PKEY_free)
1087            if password is not None:
1088                raise TypeError(
1089                    "Password was given but private key is not encrypted."
1090                )
1091
1092            return key
1093        else:
1094            self._consume_errors()
1095            return None
1096
1097    def load_der_public_key(self, data):
1098        mem_bio = self._bytes_to_bio(data)
1099        evp_pkey = self._lib.d2i_PUBKEY_bio(mem_bio.bio, self._ffi.NULL)
1100        if evp_pkey != self._ffi.NULL:
1101            evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1102            return self._evp_pkey_to_public_key(evp_pkey)
1103        else:
1104            # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still
1105            # need to check to see if it is a pure PKCS1 RSA public key (not
1106            # embedded in a subjectPublicKeyInfo)
1107            self._consume_errors()
1108            res = self._lib.BIO_reset(mem_bio.bio)
1109            self.openssl_assert(res == 1)
1110            rsa_cdata = self._lib.d2i_RSAPublicKey_bio(
1111                mem_bio.bio, self._ffi.NULL
1112            )
1113            if rsa_cdata != self._ffi.NULL:
1114                rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
1115                evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
1116                return _RSAPublicKey(self, rsa_cdata, evp_pkey)
1117            else:
1118                self._handle_key_loading_error()
1119
1120    def load_der_parameters(self, data):
1121        mem_bio = self._bytes_to_bio(data)
1122        dh_cdata = self._lib.d2i_DHparams_bio(
1123            mem_bio.bio, self._ffi.NULL
1124        )
1125        if dh_cdata != self._ffi.NULL:
1126            dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1127            return _DHParameters(self, dh_cdata)
1128        elif self._lib.Cryptography_HAS_EVP_PKEY_DHX:
1129            # We check to see if the is dhx.
1130            self._consume_errors()
1131            res = self._lib.BIO_reset(mem_bio.bio)
1132            self.openssl_assert(res == 1)
1133            dh_cdata = self._lib.Cryptography_d2i_DHxparams_bio(
1134                mem_bio.bio, self._ffi.NULL
1135            )
1136            if dh_cdata != self._ffi.NULL:
1137                dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1138                return _DHParameters(self, dh_cdata)
1139
1140        self._handle_key_loading_error()
1141
1142    def load_pem_x509_certificate(self, data):
1143        mem_bio = self._bytes_to_bio(data)
1144        x509 = self._lib.PEM_read_bio_X509(
1145            mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
1146        )
1147        if x509 == self._ffi.NULL:
1148            self._consume_errors()
1149            raise ValueError(
1150                "Unable to load certificate. See https://cryptography.io/en/la"
1151                "test/faq/#why-can-t-i-import-my-pem-file for more details."
1152            )
1153
1154        x509 = self._ffi.gc(x509, self._lib.X509_free)
1155        return _Certificate(self, x509)
1156
1157    def load_der_x509_certificate(self, data):
1158        mem_bio = self._bytes_to_bio(data)
1159        x509 = self._lib.d2i_X509_bio(mem_bio.bio, self._ffi.NULL)
1160        if x509 == self._ffi.NULL:
1161            self._consume_errors()
1162            raise ValueError("Unable to load certificate")
1163
1164        x509 = self._ffi.gc(x509, self._lib.X509_free)
1165        return _Certificate(self, x509)
1166
1167    def load_pem_x509_crl(self, data):
1168        mem_bio = self._bytes_to_bio(data)
1169        x509_crl = self._lib.PEM_read_bio_X509_CRL(
1170            mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
1171        )
1172        if x509_crl == self._ffi.NULL:
1173            self._consume_errors()
1174            raise ValueError(
1175                "Unable to load CRL. See https://cryptography.io/en/la"
1176                "test/faq/#why-can-t-i-import-my-pem-file for more details."
1177            )
1178
1179        x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free)
1180        return _CertificateRevocationList(self, x509_crl)
1181
1182    def load_der_x509_crl(self, data):
1183        mem_bio = self._bytes_to_bio(data)
1184        x509_crl = self._lib.d2i_X509_CRL_bio(mem_bio.bio, self._ffi.NULL)
1185        if x509_crl == self._ffi.NULL:
1186            self._consume_errors()
1187            raise ValueError("Unable to load CRL")
1188
1189        x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free)
1190        return _CertificateRevocationList(self, x509_crl)
1191
1192    def load_pem_x509_csr(self, data):
1193        mem_bio = self._bytes_to_bio(data)
1194        x509_req = self._lib.PEM_read_bio_X509_REQ(
1195            mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
1196        )
1197        if x509_req == self._ffi.NULL:
1198            self._consume_errors()
1199            raise ValueError(
1200                "Unable to load request. See https://cryptography.io/en/la"
1201                "test/faq/#why-can-t-i-import-my-pem-file for more details."
1202            )
1203
1204        x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free)
1205        return _CertificateSigningRequest(self, x509_req)
1206
1207    def load_der_x509_csr(self, data):
1208        mem_bio = self._bytes_to_bio(data)
1209        x509_req = self._lib.d2i_X509_REQ_bio(mem_bio.bio, self._ffi.NULL)
1210        if x509_req == self._ffi.NULL:
1211            self._consume_errors()
1212            raise ValueError("Unable to load request")
1213
1214        x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free)
1215        return _CertificateSigningRequest(self, x509_req)
1216
1217    def _load_key(self, openssl_read_func, convert_func, data, password):
1218        mem_bio = self._bytes_to_bio(data)
1219
1220        userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *")
1221        if password is not None:
1222            utils._check_byteslike("password", password)
1223            password_ptr = self._ffi.from_buffer(password)
1224            userdata.password = password_ptr
1225            userdata.length = len(password)
1226
1227        evp_pkey = openssl_read_func(
1228            mem_bio.bio,
1229            self._ffi.NULL,
1230            self._ffi.addressof(
1231                self._lib._original_lib, "Cryptography_pem_password_cb"
1232            ),
1233            userdata,
1234        )
1235
1236        if evp_pkey == self._ffi.NULL:
1237            if userdata.error != 0:
1238                errors = self._consume_errors()
1239                self.openssl_assert(errors)
1240                if userdata.error == -1:
1241                    raise TypeError(
1242                        "Password was not given but private key is encrypted"
1243                    )
1244                else:
1245                    assert userdata.error == -2
1246                    raise ValueError(
1247                        "Passwords longer than {0} bytes are not supported "
1248                        "by this backend.".format(userdata.maxsize - 1)
1249                    )
1250            else:
1251                self._handle_key_loading_error()
1252
1253        evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1254
1255        if password is not None and userdata.called == 0:
1256            raise TypeError(
1257                "Password was given but private key is not encrypted.")
1258
1259        assert (
1260            (password is not None and userdata.called == 1) or
1261            password is None
1262        )
1263
1264        return convert_func(evp_pkey)
1265
1266    def _handle_key_loading_error(self):
1267        errors = self._consume_errors()
1268
1269        if not errors:
1270            raise ValueError("Could not deserialize key data.")
1271
1272        elif (
1273            errors[0]._lib_reason_match(
1274                self._lib.ERR_LIB_EVP, self._lib.EVP_R_BAD_DECRYPT
1275            ) or errors[0]._lib_reason_match(
1276                self._lib.ERR_LIB_PKCS12,
1277                self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR
1278            )
1279        ):
1280            raise ValueError("Bad decrypt. Incorrect password?")
1281
1282        elif (
1283            errors[0]._lib_reason_match(
1284                self._lib.ERR_LIB_EVP, self._lib.EVP_R_UNKNOWN_PBE_ALGORITHM
1285            ) or errors[0]._lib_reason_match(
1286                self._lib.ERR_LIB_PEM, self._lib.PEM_R_UNSUPPORTED_ENCRYPTION
1287            )
1288        ):
1289            raise UnsupportedAlgorithm(
1290                "PEM data is encrypted with an unsupported cipher",
1291                _Reasons.UNSUPPORTED_CIPHER
1292            )
1293
1294        elif any(
1295            error._lib_reason_match(
1296                self._lib.ERR_LIB_EVP,
1297                self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM
1298            )
1299            for error in errors
1300        ):
1301            raise ValueError("Unsupported public key algorithm.")
1302
1303        else:
1304            assert errors[0].lib in (
1305                self._lib.ERR_LIB_EVP,
1306                self._lib.ERR_LIB_PEM,
1307                self._lib.ERR_LIB_ASN1,
1308            )
1309            raise ValueError("Could not deserialize key data.")
1310
1311    def elliptic_curve_supported(self, curve):
1312        try:
1313            curve_nid = self._elliptic_curve_to_nid(curve)
1314        except UnsupportedAlgorithm:
1315            curve_nid = self._lib.NID_undef
1316
1317        group = self._lib.EC_GROUP_new_by_curve_name(curve_nid)
1318
1319        if group == self._ffi.NULL:
1320            errors = self._consume_errors()
1321            self.openssl_assert(
1322                curve_nid == self._lib.NID_undef or
1323                errors[0]._lib_reason_match(
1324                    self._lib.ERR_LIB_EC,
1325                    self._lib.EC_R_UNKNOWN_GROUP
1326                )
1327            )
1328            return False
1329        else:
1330            self.openssl_assert(curve_nid != self._lib.NID_undef)
1331            self._lib.EC_GROUP_free(group)
1332            return True
1333
1334    def elliptic_curve_signature_algorithm_supported(
1335        self, signature_algorithm, curve
1336    ):
1337        # We only support ECDSA right now.
1338        if not isinstance(signature_algorithm, ec.ECDSA):
1339            return False
1340
1341        return self.elliptic_curve_supported(curve)
1342
1343    def generate_elliptic_curve_private_key(self, curve):
1344        """
1345        Generate a new private key on the named curve.
1346        """
1347
1348        if self.elliptic_curve_supported(curve):
1349            ec_cdata = self._ec_key_new_by_curve(curve)
1350
1351            res = self._lib.EC_KEY_generate_key(ec_cdata)
1352            self.openssl_assert(res == 1)
1353
1354            evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1355
1356            return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
1357        else:
1358            raise UnsupportedAlgorithm(
1359                "Backend object does not support {0}.".format(curve.name),
1360                _Reasons.UNSUPPORTED_ELLIPTIC_CURVE
1361            )
1362
1363    def load_elliptic_curve_private_numbers(self, numbers):
1364        public = numbers.public_numbers
1365
1366        ec_cdata = self._ec_key_new_by_curve(public.curve)
1367
1368        private_value = self._ffi.gc(
1369            self._int_to_bn(numbers.private_value), self._lib.BN_clear_free
1370        )
1371        res = self._lib.EC_KEY_set_private_key(ec_cdata, private_value)
1372        self.openssl_assert(res == 1)
1373
1374        ec_cdata = self._ec_key_set_public_key_affine_coordinates(
1375            ec_cdata, public.x, public.y)
1376
1377        evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1378
1379        return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
1380
1381    def load_elliptic_curve_public_numbers(self, numbers):
1382        ec_cdata = self._ec_key_new_by_curve(numbers.curve)
1383        ec_cdata = self._ec_key_set_public_key_affine_coordinates(
1384            ec_cdata, numbers.x, numbers.y)
1385        evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1386
1387        return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
1388
1389    def load_elliptic_curve_public_bytes(self, curve, point_bytes):
1390        ec_cdata = self._ec_key_new_by_curve(curve)
1391        group = self._lib.EC_KEY_get0_group(ec_cdata)
1392        self.openssl_assert(group != self._ffi.NULL)
1393        point = self._lib.EC_POINT_new(group)
1394        self.openssl_assert(point != self._ffi.NULL)
1395        point = self._ffi.gc(point, self._lib.EC_POINT_free)
1396        with self._tmp_bn_ctx() as bn_ctx:
1397            res = self._lib.EC_POINT_oct2point(
1398                group, point, point_bytes, len(point_bytes), bn_ctx
1399            )
1400            if res != 1:
1401                self._consume_errors()
1402                raise ValueError("Invalid public bytes for the given curve")
1403
1404        res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
1405        self.openssl_assert(res == 1)
1406        evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1407        return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
1408
1409    def derive_elliptic_curve_private_key(self, private_value, curve):
1410        ec_cdata = self._ec_key_new_by_curve(curve)
1411
1412        get_func, group = self._ec_key_determine_group_get_func(ec_cdata)
1413
1414        point = self._lib.EC_POINT_new(group)
1415        self.openssl_assert(point != self._ffi.NULL)
1416        point = self._ffi.gc(point, self._lib.EC_POINT_free)
1417
1418        value = self._int_to_bn(private_value)
1419        value = self._ffi.gc(value, self._lib.BN_clear_free)
1420
1421        with self._tmp_bn_ctx() as bn_ctx:
1422            res = self._lib.EC_POINT_mul(group, point, value, self._ffi.NULL,
1423                                         self._ffi.NULL, bn_ctx)
1424            self.openssl_assert(res == 1)
1425
1426            bn_x = self._lib.BN_CTX_get(bn_ctx)
1427            bn_y = self._lib.BN_CTX_get(bn_ctx)
1428
1429            res = get_func(group, point, bn_x, bn_y, bn_ctx)
1430            self.openssl_assert(res == 1)
1431
1432        res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
1433        self.openssl_assert(res == 1)
1434        private = self._int_to_bn(private_value)
1435        private = self._ffi.gc(private, self._lib.BN_clear_free)
1436        res = self._lib.EC_KEY_set_private_key(ec_cdata, private)
1437        self.openssl_assert(res == 1)
1438
1439        evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1440
1441        return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
1442
1443    def _ec_key_new_by_curve(self, curve):
1444        curve_nid = self._elliptic_curve_to_nid(curve)
1445        ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid)
1446        self.openssl_assert(ec_cdata != self._ffi.NULL)
1447        return self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
1448
1449    def load_der_ocsp_request(self, data):
1450        mem_bio = self._bytes_to_bio(data)
1451        request = self._lib.d2i_OCSP_REQUEST_bio(mem_bio.bio, self._ffi.NULL)
1452        if request == self._ffi.NULL:
1453            self._consume_errors()
1454            raise ValueError("Unable to load OCSP request")
1455
1456        request = self._ffi.gc(request, self._lib.OCSP_REQUEST_free)
1457        return _OCSPRequest(self, request)
1458
1459    def load_der_ocsp_response(self, data):
1460        mem_bio = self._bytes_to_bio(data)
1461        response = self._lib.d2i_OCSP_RESPONSE_bio(mem_bio.bio, self._ffi.NULL)
1462        if response == self._ffi.NULL:
1463            self._consume_errors()
1464            raise ValueError("Unable to load OCSP response")
1465
1466        response = self._ffi.gc(response, self._lib.OCSP_RESPONSE_free)
1467        return _OCSPResponse(self, response)
1468
1469    def create_ocsp_request(self, builder):
1470        ocsp_req = self._lib.OCSP_REQUEST_new()
1471        self.openssl_assert(ocsp_req != self._ffi.NULL)
1472        ocsp_req = self._ffi.gc(ocsp_req, self._lib.OCSP_REQUEST_free)
1473        cert, issuer, algorithm = builder._request
1474        evp_md = self._evp_md_non_null_from_algorithm(algorithm)
1475        certid = self._lib.OCSP_cert_to_id(
1476            evp_md, cert._x509, issuer._x509
1477        )
1478        self.openssl_assert(certid != self._ffi.NULL)
1479        onereq = self._lib.OCSP_request_add0_id(ocsp_req, certid)
1480        self.openssl_assert(onereq != self._ffi.NULL)
1481        self._create_x509_extensions(
1482            extensions=builder._extensions,
1483            handlers=_OCSP_REQUEST_EXTENSION_ENCODE_HANDLERS,
1484            x509_obj=ocsp_req,
1485            add_func=self._lib.OCSP_REQUEST_add_ext,
1486            gc=True,
1487        )
1488        return _OCSPRequest(self, ocsp_req)
1489
1490    def _create_ocsp_basic_response(self, builder, private_key, algorithm):
1491        basic = self._lib.OCSP_BASICRESP_new()
1492        self.openssl_assert(basic != self._ffi.NULL)
1493        basic = self._ffi.gc(basic, self._lib.OCSP_BASICRESP_free)
1494        evp_md = self._evp_md_non_null_from_algorithm(
1495            builder._response._algorithm
1496        )
1497        certid = self._lib.OCSP_cert_to_id(
1498            evp_md, builder._response._cert._x509,
1499            builder._response._issuer._x509
1500        )
1501        self.openssl_assert(certid != self._ffi.NULL)
1502        certid = self._ffi.gc(certid, self._lib.OCSP_CERTID_free)
1503        if builder._response._revocation_reason is None:
1504            reason = -1
1505        else:
1506            reason = _CRL_ENTRY_REASON_ENUM_TO_CODE[
1507                builder._response._revocation_reason
1508            ]
1509        if builder._response._revocation_time is None:
1510            rev_time = self._ffi.NULL
1511        else:
1512            rev_time = self._create_asn1_time(
1513                builder._response._revocation_time
1514            )
1515
1516        next_update = self._ffi.NULL
1517        if builder._response._next_update is not None:
1518            next_update = self._create_asn1_time(
1519                builder._response._next_update
1520            )
1521
1522        this_update = self._create_asn1_time(builder._response._this_update)
1523
1524        res = self._lib.OCSP_basic_add1_status(
1525            basic,
1526            certid,
1527            builder._response._cert_status.value,
1528            reason,
1529            rev_time,
1530            this_update,
1531            next_update
1532        )
1533        self.openssl_assert(res != self._ffi.NULL)
1534        # okay, now sign the basic structure
1535        evp_md = self._evp_md_non_null_from_algorithm(algorithm)
1536        responder_cert, responder_encoding = builder._responder_id
1537        flags = self._lib.OCSP_NOCERTS
1538        if responder_encoding is ocsp.OCSPResponderEncoding.HASH:
1539            flags |= self._lib.OCSP_RESPID_KEY
1540
1541        if builder._certs is not None:
1542            for cert in builder._certs:
1543                res = self._lib.OCSP_basic_add1_cert(basic, cert._x509)
1544                self.openssl_assert(res == 1)
1545
1546        self._create_x509_extensions(
1547            extensions=builder._extensions,
1548            handlers=_OCSP_BASICRESP_EXTENSION_ENCODE_HANDLERS,
1549            x509_obj=basic,
1550            add_func=self._lib.OCSP_BASICRESP_add_ext,
1551            gc=True,
1552        )
1553
1554        res = self._lib.OCSP_basic_sign(
1555            basic, responder_cert._x509, private_key._evp_pkey,
1556            evp_md, self._ffi.NULL, flags
1557        )
1558        if res != 1:
1559            errors = self._consume_errors()
1560            self.openssl_assert(
1561                errors[0]._lib_reason_match(
1562                    self._lib.ERR_LIB_X509,
1563                    self._lib.X509_R_KEY_VALUES_MISMATCH
1564                )
1565            )
1566            raise ValueError("responder_cert must be signed by private_key")
1567
1568        return basic
1569
1570    def create_ocsp_response(self, response_status, builder, private_key,
1571                             algorithm):
1572        if response_status is ocsp.OCSPResponseStatus.SUCCESSFUL:
1573            basic = self._create_ocsp_basic_response(
1574                builder, private_key, algorithm
1575            )
1576        else:
1577            basic = self._ffi.NULL
1578
1579        ocsp_resp = self._lib.OCSP_response_create(
1580            response_status.value, basic
1581        )
1582        self.openssl_assert(ocsp_resp != self._ffi.NULL)
1583        ocsp_resp = self._ffi.gc(ocsp_resp, self._lib.OCSP_RESPONSE_free)
1584        return _OCSPResponse(self, ocsp_resp)
1585
1586    def elliptic_curve_exchange_algorithm_supported(self, algorithm, curve):
1587        return (
1588            self.elliptic_curve_supported(curve) and
1589            isinstance(algorithm, ec.ECDH)
1590        )
1591
1592    def _ec_cdata_to_evp_pkey(self, ec_cdata):
1593        evp_pkey = self._create_evp_pkey_gc()
1594        res = self._lib.EVP_PKEY_set1_EC_KEY(evp_pkey, ec_cdata)
1595        self.openssl_assert(res == 1)
1596        return evp_pkey
1597
1598    def _elliptic_curve_to_nid(self, curve):
1599        """
1600        Get the NID for a curve name.
1601        """
1602
1603        curve_aliases = {
1604            "secp192r1": "prime192v1",
1605            "secp256r1": "prime256v1"
1606        }
1607
1608        curve_name = curve_aliases.get(curve.name, curve.name)
1609
1610        curve_nid = self._lib.OBJ_sn2nid(curve_name.encode())
1611        if curve_nid == self._lib.NID_undef:
1612            raise UnsupportedAlgorithm(
1613                "{0} is not a supported elliptic curve".format(curve.name),
1614                _Reasons.UNSUPPORTED_ELLIPTIC_CURVE
1615            )
1616        return curve_nid
1617
1618    @contextmanager
1619    def _tmp_bn_ctx(self):
1620        bn_ctx = self._lib.BN_CTX_new()
1621        self.openssl_assert(bn_ctx != self._ffi.NULL)
1622        bn_ctx = self._ffi.gc(bn_ctx, self._lib.BN_CTX_free)
1623        self._lib.BN_CTX_start(bn_ctx)
1624        try:
1625            yield bn_ctx
1626        finally:
1627            self._lib.BN_CTX_end(bn_ctx)
1628
1629    def _ec_key_determine_group_get_func(self, ctx):
1630        """
1631        Given an EC_KEY determine the group and what function is required to
1632        get point coordinates.
1633        """
1634        self.openssl_assert(ctx != self._ffi.NULL)
1635
1636        nid_two_field = self._lib.OBJ_sn2nid(b"characteristic-two-field")
1637        self.openssl_assert(nid_two_field != self._lib.NID_undef)
1638
1639        group = self._lib.EC_KEY_get0_group(ctx)
1640        self.openssl_assert(group != self._ffi.NULL)
1641
1642        method = self._lib.EC_GROUP_method_of(group)
1643        self.openssl_assert(method != self._ffi.NULL)
1644
1645        nid = self._lib.EC_METHOD_get_field_type(method)
1646        self.openssl_assert(nid != self._lib.NID_undef)
1647
1648        if nid == nid_two_field and self._lib.Cryptography_HAS_EC2M:
1649            get_func = self._lib.EC_POINT_get_affine_coordinates_GF2m
1650        else:
1651            get_func = self._lib.EC_POINT_get_affine_coordinates_GFp
1652
1653        assert get_func
1654
1655        return get_func, group
1656
1657    def _ec_key_set_public_key_affine_coordinates(self, ctx, x, y):
1658        """
1659        Sets the public key point in the EC_KEY context to the affine x and y
1660        values.
1661        """
1662
1663        if x < 0 or y < 0:
1664            raise ValueError(
1665                "Invalid EC key. Both x and y must be non-negative."
1666            )
1667
1668        x = self._ffi.gc(self._int_to_bn(x), self._lib.BN_free)
1669        y = self._ffi.gc(self._int_to_bn(y), self._lib.BN_free)
1670        res = self._lib.EC_KEY_set_public_key_affine_coordinates(ctx, x, y)
1671        if res != 1:
1672            self._consume_errors()
1673            raise ValueError("Invalid EC key.")
1674
1675        return ctx
1676
1677    def _private_key_bytes(self, encoding, format, encryption_algorithm,
1678                           evp_pkey, cdata):
1679        if not isinstance(format, serialization.PrivateFormat):
1680            raise TypeError(
1681                "format must be an item from the PrivateFormat enum"
1682            )
1683
1684        # X9.62 encoding is only valid for EC public keys
1685        if encoding is serialization.Encoding.X962:
1686            raise ValueError("X9.62 format is only valid for EC public keys")
1687
1688        # Raw format and encoding are only valid for X25519, Ed25519, X448, and
1689        # Ed448 keys. We capture those cases before this method is called so if
1690        # we see those enum values here it means the caller has passed them to
1691        # a key that doesn't support raw type
1692        if format is serialization.PrivateFormat.Raw:
1693            raise ValueError("raw format is invalid with this key or encoding")
1694
1695        if encoding is serialization.Encoding.Raw:
1696            raise ValueError("raw encoding is invalid with this key or format")
1697
1698        if not isinstance(encryption_algorithm,
1699                          serialization.KeySerializationEncryption):
1700            raise TypeError(
1701                "Encryption algorithm must be a KeySerializationEncryption "
1702                "instance"
1703            )
1704
1705        if isinstance(encryption_algorithm, serialization.NoEncryption):
1706            password = b""
1707            passlen = 0
1708            evp_cipher = self._ffi.NULL
1709        elif isinstance(encryption_algorithm,
1710                        serialization.BestAvailableEncryption):
1711            # This is a curated value that we will update over time.
1712            evp_cipher = self._lib.EVP_get_cipherbyname(
1713                b"aes-256-cbc"
1714            )
1715            password = encryption_algorithm.password
1716            passlen = len(password)
1717            if passlen > 1023:
1718                raise ValueError(
1719                    "Passwords longer than 1023 bytes are not supported by "
1720                    "this backend"
1721                )
1722        else:
1723            raise ValueError("Unsupported encryption type")
1724
1725        key_type = self._lib.EVP_PKEY_id(evp_pkey)
1726        if encoding is serialization.Encoding.PEM:
1727            if format is serialization.PrivateFormat.PKCS8:
1728                write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey
1729                key = evp_pkey
1730            else:
1731                assert format is serialization.PrivateFormat.TraditionalOpenSSL
1732                if key_type == self._lib.EVP_PKEY_RSA:
1733                    write_bio = self._lib.PEM_write_bio_RSAPrivateKey
1734                elif key_type == self._lib.EVP_PKEY_DSA:
1735                    write_bio = self._lib.PEM_write_bio_DSAPrivateKey
1736                else:
1737                    assert key_type == self._lib.EVP_PKEY_EC
1738                    write_bio = self._lib.PEM_write_bio_ECPrivateKey
1739
1740                key = cdata
1741        elif encoding is serialization.Encoding.DER:
1742            if format is serialization.PrivateFormat.TraditionalOpenSSL:
1743                if not isinstance(
1744                    encryption_algorithm, serialization.NoEncryption
1745                ):
1746                    raise ValueError(
1747                        "Encryption is not supported for DER encoded "
1748                        "traditional OpenSSL keys"
1749                    )
1750
1751                return self._private_key_bytes_traditional_der(key_type, cdata)
1752            else:
1753                assert format is serialization.PrivateFormat.PKCS8
1754                write_bio = self._lib.i2d_PKCS8PrivateKey_bio
1755                key = evp_pkey
1756        else:
1757            raise TypeError("encoding must be Encoding.PEM or Encoding.DER")
1758
1759        bio = self._create_mem_bio_gc()
1760        res = write_bio(
1761            bio,
1762            key,
1763            evp_cipher,
1764            password,
1765            passlen,
1766            self._ffi.NULL,
1767            self._ffi.NULL
1768        )
1769        self.openssl_assert(res == 1)
1770        return self._read_mem_bio(bio)
1771
1772    def _private_key_bytes_traditional_der(self, key_type, cdata):
1773        if key_type == self._lib.EVP_PKEY_RSA:
1774            write_bio = self._lib.i2d_RSAPrivateKey_bio
1775        elif key_type == self._lib.EVP_PKEY_EC:
1776            write_bio = self._lib.i2d_ECPrivateKey_bio
1777        else:
1778            self.openssl_assert(key_type == self._lib.EVP_PKEY_DSA)
1779            write_bio = self._lib.i2d_DSAPrivateKey_bio
1780
1781        bio = self._create_mem_bio_gc()
1782        res = write_bio(bio, cdata)
1783        self.openssl_assert(res == 1)
1784        return self._read_mem_bio(bio)
1785
1786    def _public_key_bytes(self, encoding, format, key, evp_pkey, cdata):
1787        if not isinstance(encoding, serialization.Encoding):
1788            raise TypeError("encoding must be an item from the Encoding enum")
1789
1790        # Compressed/UncompressedPoint are only valid for EC keys and those
1791        # cases are handled by the ECPublicKey public_bytes method before this
1792        # method is called
1793        if format in (serialization.PublicFormat.UncompressedPoint,
1794                      serialization.PublicFormat.CompressedPoint):
1795            raise ValueError("Point formats are not valid for this key type")
1796
1797        # Raw format and encoding are only valid for X25519, Ed25519, X448, and
1798        # Ed448 keys. We capture those cases before this method is called so if
1799        # we see those enum values here it means the caller has passed them to
1800        # a key that doesn't support raw type
1801        if format is serialization.PublicFormat.Raw:
1802            raise ValueError("raw format is invalid with this key or encoding")
1803
1804        if encoding is serialization.Encoding.Raw:
1805            raise ValueError("raw encoding is invalid with this key or format")
1806
1807        if (
1808            format is serialization.PublicFormat.OpenSSH or
1809            encoding is serialization.Encoding.OpenSSH
1810        ):
1811            if (
1812                format is not serialization.PublicFormat.OpenSSH or
1813                encoding is not serialization.Encoding.OpenSSH
1814            ):
1815                raise ValueError(
1816                    "OpenSSH format must be used with OpenSSH encoding"
1817                )
1818            return self._openssh_public_key_bytes(key)
1819        elif format is serialization.PublicFormat.SubjectPublicKeyInfo:
1820            if encoding is serialization.Encoding.PEM:
1821                write_bio = self._lib.PEM_write_bio_PUBKEY
1822            else:
1823                assert encoding is serialization.Encoding.DER
1824                write_bio = self._lib.i2d_PUBKEY_bio
1825
1826            key = evp_pkey
1827        elif format is serialization.PublicFormat.PKCS1:
1828            # Only RSA is supported here.
1829            assert self._lib.EVP_PKEY_id(evp_pkey) == self._lib.EVP_PKEY_RSA
1830            if encoding is serialization.Encoding.PEM:
1831                write_bio = self._lib.PEM_write_bio_RSAPublicKey
1832            else:
1833                assert encoding is serialization.Encoding.DER
1834                write_bio = self._lib.i2d_RSAPublicKey_bio
1835
1836            key = cdata
1837        else:
1838            raise TypeError(
1839                "format must be an item from the PublicFormat enum"
1840            )
1841
1842        bio = self._create_mem_bio_gc()
1843        res = write_bio(bio, key)
1844        self.openssl_assert(res == 1)
1845        return self._read_mem_bio(bio)
1846
1847    def _openssh_public_key_bytes(self, key):
1848        if isinstance(key, rsa.RSAPublicKey):
1849            public_numbers = key.public_numbers()
1850            return b"ssh-rsa " + base64.b64encode(
1851                ssh._ssh_write_string(b"ssh-rsa") +
1852                ssh._ssh_write_mpint(public_numbers.e) +
1853                ssh._ssh_write_mpint(public_numbers.n)
1854            )
1855        elif isinstance(key, dsa.DSAPublicKey):
1856            public_numbers = key.public_numbers()
1857            parameter_numbers = public_numbers.parameter_numbers
1858            return b"ssh-dss " + base64.b64encode(
1859                ssh._ssh_write_string(b"ssh-dss") +
1860                ssh._ssh_write_mpint(parameter_numbers.p) +
1861                ssh._ssh_write_mpint(parameter_numbers.q) +
1862                ssh._ssh_write_mpint(parameter_numbers.g) +
1863                ssh._ssh_write_mpint(public_numbers.y)
1864            )
1865        else:
1866            assert isinstance(key, ec.EllipticCurvePublicKey)
1867            public_numbers = key.public_numbers()
1868            try:
1869                curve_name = {
1870                    ec.SECP256R1: b"nistp256",
1871                    ec.SECP384R1: b"nistp384",
1872                    ec.SECP521R1: b"nistp521",
1873                }[type(public_numbers.curve)]
1874            except KeyError:
1875                raise ValueError(
1876                    "Only SECP256R1, SECP384R1, and SECP521R1 curves are "
1877                    "supported by the SSH public key format"
1878                )
1879
1880            point = key.public_bytes(
1881                serialization.Encoding.X962,
1882                serialization.PublicFormat.UncompressedPoint
1883            )
1884            return b"ecdsa-sha2-" + curve_name + b" " + base64.b64encode(
1885                ssh._ssh_write_string(b"ecdsa-sha2-" + curve_name) +
1886                ssh._ssh_write_string(curve_name) +
1887                ssh._ssh_write_string(point)
1888            )
1889
1890    def _parameter_bytes(self, encoding, format, cdata):
1891        if encoding is serialization.Encoding.OpenSSH:
1892            raise TypeError(
1893                "OpenSSH encoding is not supported"
1894            )
1895
1896        # Only DH is supported here currently.
1897        q = self._ffi.new("BIGNUM **")
1898        self._lib.DH_get0_pqg(cdata,
1899                              self._ffi.NULL,
1900                              q,
1901                              self._ffi.NULL)
1902        if encoding is serialization.Encoding.PEM:
1903            if q[0] != self._ffi.NULL:
1904                write_bio = self._lib.PEM_write_bio_DHxparams
1905            else:
1906                write_bio = self._lib.PEM_write_bio_DHparams
1907        elif encoding is serialization.Encoding.DER:
1908            if q[0] != self._ffi.NULL:
1909                write_bio = self._lib.Cryptography_i2d_DHxparams_bio
1910            else:
1911                write_bio = self._lib.i2d_DHparams_bio
1912        else:
1913            raise TypeError("encoding must be an item from the Encoding enum")
1914
1915        bio = self._create_mem_bio_gc()
1916        res = write_bio(bio, cdata)
1917        self.openssl_assert(res == 1)
1918        return self._read_mem_bio(bio)
1919
1920    def generate_dh_parameters(self, generator, key_size):
1921        if key_size < 512:
1922            raise ValueError("DH key_size must be at least 512 bits")
1923
1924        if generator not in (2, 5):
1925            raise ValueError("DH generator must be 2 or 5")
1926
1927        dh_param_cdata = self._lib.DH_new()
1928        self.openssl_assert(dh_param_cdata != self._ffi.NULL)
1929        dh_param_cdata = self._ffi.gc(dh_param_cdata, self._lib.DH_free)
1930
1931        res = self._lib.DH_generate_parameters_ex(
1932            dh_param_cdata,
1933            key_size,
1934            generator,
1935            self._ffi.NULL
1936        )
1937        self.openssl_assert(res == 1)
1938
1939        return _DHParameters(self, dh_param_cdata)
1940
1941    def _dh_cdata_to_evp_pkey(self, dh_cdata):
1942        evp_pkey = self._create_evp_pkey_gc()
1943        res = self._lib.EVP_PKEY_set1_DH(evp_pkey, dh_cdata)
1944        self.openssl_assert(res == 1)
1945        return evp_pkey
1946
1947    def generate_dh_private_key(self, parameters):
1948        dh_key_cdata = _dh_params_dup(parameters._dh_cdata, self)
1949
1950        res = self._lib.DH_generate_key(dh_key_cdata)
1951        self.openssl_assert(res == 1)
1952
1953        evp_pkey = self._dh_cdata_to_evp_pkey(dh_key_cdata)
1954
1955        return _DHPrivateKey(self, dh_key_cdata, evp_pkey)
1956
1957    def generate_dh_private_key_and_parameters(self, generator, key_size):
1958        return self.generate_dh_private_key(
1959            self.generate_dh_parameters(generator, key_size))
1960
1961    def load_dh_private_numbers(self, numbers):
1962        parameter_numbers = numbers.public_numbers.parameter_numbers
1963
1964        dh_cdata = self._lib.DH_new()
1965        self.openssl_assert(dh_cdata != self._ffi.NULL)
1966        dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1967
1968        p = self._int_to_bn(parameter_numbers.p)
1969        g = self._int_to_bn(parameter_numbers.g)
1970
1971        if parameter_numbers.q is not None:
1972            q = self._int_to_bn(parameter_numbers.q)
1973        else:
1974            q = self._ffi.NULL
1975
1976        pub_key = self._int_to_bn(numbers.public_numbers.y)
1977        priv_key = self._int_to_bn(numbers.x)
1978
1979        res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
1980        self.openssl_assert(res == 1)
1981
1982        res = self._lib.DH_set0_key(dh_cdata, pub_key, priv_key)
1983        self.openssl_assert(res == 1)
1984
1985        codes = self._ffi.new("int[]", 1)
1986        res = self._lib.Cryptography_DH_check(dh_cdata, codes)
1987        self.openssl_assert(res == 1)
1988
1989        # DH_check will return DH_NOT_SUITABLE_GENERATOR if p % 24 does not
1990        # equal 11 when the generator is 2 (a quadratic nonresidue).
1991        # We want to ignore that error because p % 24 == 23 is also fine.
1992        # Specifically, g is then a quadratic residue. Within the context of
1993        # Diffie-Hellman this means it can only generate half the possible
1994        # values. That sounds bad, but quadratic nonresidues leak a bit of
1995        # the key to the attacker in exchange for having the full key space
1996        # available. See: https://crypto.stackexchange.com/questions/12961
1997        if codes[0] != 0 and not (
1998            parameter_numbers.g == 2 and
1999            codes[0] ^ self._lib.DH_NOT_SUITABLE_GENERATOR == 0
2000        ):
2001            raise ValueError(
2002                "DH private numbers did not pass safety checks."
2003            )
2004
2005        evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata)
2006
2007        return _DHPrivateKey(self, dh_cdata, evp_pkey)
2008
2009    def load_dh_public_numbers(self, numbers):
2010        dh_cdata = self._lib.DH_new()
2011        self.openssl_assert(dh_cdata != self._ffi.NULL)
2012        dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
2013
2014        parameter_numbers = numbers.parameter_numbers
2015
2016        p = self._int_to_bn(parameter_numbers.p)
2017        g = self._int_to_bn(parameter_numbers.g)
2018
2019        if parameter_numbers.q is not None:
2020            q = self._int_to_bn(parameter_numbers.q)
2021        else:
2022            q = self._ffi.NULL
2023
2024        pub_key = self._int_to_bn(numbers.y)
2025
2026        res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
2027        self.openssl_assert(res == 1)
2028
2029        res = self._lib.DH_set0_key(dh_cdata, pub_key, self._ffi.NULL)
2030        self.openssl_assert(res == 1)
2031
2032        evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata)
2033
2034        return _DHPublicKey(self, dh_cdata, evp_pkey)
2035
2036    def load_dh_parameter_numbers(self, numbers):
2037        dh_cdata = self._lib.DH_new()
2038        self.openssl_assert(dh_cdata != self._ffi.NULL)
2039        dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
2040
2041        p = self._int_to_bn(numbers.p)
2042        g = self._int_to_bn(numbers.g)
2043
2044        if numbers.q is not None:
2045            q = self._int_to_bn(numbers.q)
2046        else:
2047            q = self._ffi.NULL
2048
2049        res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
2050        self.openssl_assert(res == 1)
2051
2052        return _DHParameters(self, dh_cdata)
2053
2054    def dh_parameters_supported(self, p, g, q=None):
2055        dh_cdata = self._lib.DH_new()
2056        self.openssl_assert(dh_cdata != self._ffi.NULL)
2057        dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
2058
2059        p = self._int_to_bn(p)
2060        g = self._int_to_bn(g)
2061
2062        if q is not None:
2063            q = self._int_to_bn(q)
2064        else:
2065            q = self._ffi.NULL
2066
2067        res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
2068        self.openssl_assert(res == 1)
2069
2070        codes = self._ffi.new("int[]", 1)
2071        res = self._lib.Cryptography_DH_check(dh_cdata, codes)
2072        self.openssl_assert(res == 1)
2073
2074        return codes[0] == 0
2075
2076    def dh_x942_serialization_supported(self):
2077        return self._lib.Cryptography_HAS_EVP_PKEY_DHX == 1
2078
2079    def x509_name_bytes(self, name):
2080        x509_name = _encode_name_gc(self, name)
2081        pp = self._ffi.new("unsigned char **")
2082        res = self._lib.i2d_X509_NAME(x509_name, pp)
2083        self.openssl_assert(pp[0] != self._ffi.NULL)
2084        pp = self._ffi.gc(
2085            pp, lambda pointer: self._lib.OPENSSL_free(pointer[0])
2086        )
2087        self.openssl_assert(res > 0)
2088        return self._ffi.buffer(pp[0], res)[:]
2089
2090    def x25519_load_public_bytes(self, data):
2091        # When we drop support for CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 we can
2092        # switch this to EVP_PKEY_new_raw_public_key
2093        if len(data) != 32:
2094            raise ValueError("An X25519 public key is 32 bytes long")
2095
2096        evp_pkey = self._create_evp_pkey_gc()
2097        res = self._lib.EVP_PKEY_set_type(evp_pkey, self._lib.NID_X25519)
2098        backend.openssl_assert(res == 1)
2099        res = self._lib.EVP_PKEY_set1_tls_encodedpoint(
2100            evp_pkey, data, len(data)
2101        )
2102        backend.openssl_assert(res == 1)
2103        return _X25519PublicKey(self, evp_pkey)
2104
2105    def x25519_load_private_bytes(self, data):
2106        # When we drop support for CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 we can
2107        # switch this to EVP_PKEY_new_raw_private_key and drop the
2108        # zeroed_bytearray garbage.
2109        # OpenSSL only has facilities for loading PKCS8 formatted private
2110        # keys using the algorithm identifiers specified in
2111        # https://tools.ietf.org/html/draft-ietf-curdle-pkix-09.
2112        # This is the standard PKCS8 prefix for a 32 byte X25519 key.
2113        # The form is:
2114        #    0:d=0  hl=2 l=  46 cons: SEQUENCE
2115        #    2:d=1  hl=2 l=   1 prim: INTEGER           :00
2116        #    5:d=1  hl=2 l=   5 cons: SEQUENCE
2117        #    7:d=2  hl=2 l=   3 prim: OBJECT            :1.3.101.110
2118        #    12:d=1  hl=2 l=  34 prim: OCTET STRING      (the key)
2119        # Of course there's a bit more complexity. In reality OCTET STRING
2120        # contains an OCTET STRING of length 32! So the last two bytes here
2121        # are \x04\x20, which is an OCTET STRING of length 32.
2122        if len(data) != 32:
2123            raise ValueError("An X25519 private key is 32 bytes long")
2124
2125        pkcs8_prefix = b'0.\x02\x01\x000\x05\x06\x03+en\x04"\x04 '
2126        with self._zeroed_bytearray(48) as ba:
2127            ba[0:16] = pkcs8_prefix
2128            ba[16:] = data
2129            bio = self._bytes_to_bio(ba)
2130            evp_pkey = backend._lib.d2i_PrivateKey_bio(bio.bio, self._ffi.NULL)
2131
2132        self.openssl_assert(evp_pkey != self._ffi.NULL)
2133        evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
2134        self.openssl_assert(
2135            self._lib.EVP_PKEY_id(evp_pkey) == self._lib.EVP_PKEY_X25519
2136        )
2137        return _X25519PrivateKey(self, evp_pkey)
2138
2139    def _evp_pkey_keygen_gc(self, nid):
2140        evp_pkey_ctx = self._lib.EVP_PKEY_CTX_new_id(nid, self._ffi.NULL)
2141        self.openssl_assert(evp_pkey_ctx != self._ffi.NULL)
2142        evp_pkey_ctx = self._ffi.gc(evp_pkey_ctx, self._lib.EVP_PKEY_CTX_free)
2143        res = self._lib.EVP_PKEY_keygen_init(evp_pkey_ctx)
2144        self.openssl_assert(res == 1)
2145        evp_ppkey = self._ffi.new("EVP_PKEY **")
2146        res = self._lib.EVP_PKEY_keygen(evp_pkey_ctx, evp_ppkey)
2147        self.openssl_assert(res == 1)
2148        self.openssl_assert(evp_ppkey[0] != self._ffi.NULL)
2149        evp_pkey = self._ffi.gc(evp_ppkey[0], self._lib.EVP_PKEY_free)
2150        return evp_pkey
2151
2152    def x25519_generate_key(self):
2153        evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X25519)
2154        return _X25519PrivateKey(self, evp_pkey)
2155
2156    def x25519_supported(self):
2157        return self._lib.CRYPTOGRAPHY_OPENSSL_110_OR_GREATER
2158
2159    def x448_load_public_bytes(self, data):
2160        if len(data) != 56:
2161            raise ValueError("An X448 public key is 56 bytes long")
2162
2163        evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
2164            self._lib.NID_X448, self._ffi.NULL, data, len(data)
2165        )
2166        self.openssl_assert(evp_pkey != self._ffi.NULL)
2167        evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
2168        return _X448PublicKey(self, evp_pkey)
2169
2170    def x448_load_private_bytes(self, data):
2171        if len(data) != 56:
2172            raise ValueError("An X448 private key is 56 bytes long")
2173
2174        data_ptr = self._ffi.from_buffer(data)
2175        evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
2176            self._lib.NID_X448, self._ffi.NULL, data_ptr, len(data)
2177        )
2178        self.openssl_assert(evp_pkey != self._ffi.NULL)
2179        evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
2180        return _X448PrivateKey(self, evp_pkey)
2181
2182    def x448_generate_key(self):
2183        evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X448)
2184        return _X448PrivateKey(self, evp_pkey)
2185
2186    def x448_supported(self):
2187        return not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111
2188
2189    def derive_scrypt(self, key_material, salt, length, n, r, p):
2190        buf = self._ffi.new("unsigned char[]", length)
2191        key_material_ptr = self._ffi.from_buffer(key_material)
2192        res = self._lib.EVP_PBE_scrypt(
2193            key_material_ptr, len(key_material), salt, len(salt), n, r, p,
2194            scrypt._MEM_LIMIT, buf, length
2195        )
2196        if res != 1:
2197            errors = self._consume_errors()
2198            if not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111:
2199                # This error is only added to the stack in 1.1.1+
2200                self.openssl_assert(
2201                    errors[0]._lib_reason_match(
2202                        self._lib.ERR_LIB_EVP,
2203                        self._lib.ERR_R_MALLOC_FAILURE
2204                    ) or
2205                    errors[0]._lib_reason_match(
2206                        self._lib.ERR_LIB_EVP,
2207                        self._lib.EVP_R_MEMORY_LIMIT_EXCEEDED
2208                    )
2209                )
2210
2211            # memory required formula explained here:
2212            # https://blog.filippo.io/the-scrypt-parameters/
2213            min_memory = 128 * n * r // (1024**2)
2214            raise MemoryError(
2215                "Not enough memory to derive key. These parameters require"
2216                " {} MB of memory.".format(min_memory)
2217            )
2218        return self._ffi.buffer(buf)[:]
2219
2220    def aead_cipher_supported(self, cipher):
2221        cipher_name = aead._aead_cipher_name(cipher)
2222        return (
2223            self._lib.EVP_get_cipherbyname(cipher_name) != self._ffi.NULL
2224        )
2225
2226    @contextlib.contextmanager
2227    def _zeroed_bytearray(self, length):
2228        """
2229        This method creates a bytearray, which we copy data into (hopefully
2230        also from a mutable buffer that can be dynamically erased!), and then
2231        zero when we're done.
2232        """
2233        ba = bytearray(length)
2234        try:
2235            yield ba
2236        finally:
2237            self._zero_data(ba, length)
2238
2239    def _zero_data(self, data, length):
2240        # We clear things this way because at the moment we're not
2241        # sure of a better way that can guarantee it overwrites the
2242        # memory of a bytearray and doesn't just replace the underlying char *.
2243        for i in range(length):
2244            data[i] = 0
2245
2246    @contextlib.contextmanager
2247    def _zeroed_null_terminated_buf(self, data):
2248        """
2249        This method takes bytes, which can be a bytestring or a mutable
2250        buffer like a bytearray, and yields a null-terminated version of that
2251        data. This is required because PKCS12_parse doesn't take a length with
2252        its password char * and ffi.from_buffer doesn't provide null
2253        termination. So, to support zeroing the data via bytearray we
2254        need to build this ridiculous construct that copies the memory, but
2255        zeroes it after use.
2256        """
2257        if data is None:
2258            yield self._ffi.NULL
2259        else:
2260            data_len = len(data)
2261            buf = self._ffi.new("char[]", data_len + 1)
2262            self._ffi.memmove(buf, data, data_len)
2263            try:
2264                yield buf
2265            finally:
2266                # Cast to a uint8_t * so we can assign by integer
2267                self._zero_data(self._ffi.cast("uint8_t *", buf), data_len)
2268
2269    def load_key_and_certificates_from_pkcs12(self, data, password):
2270        if password is not None:
2271            utils._check_byteslike("password", password)
2272
2273        bio = self._bytes_to_bio(data)
2274        p12 = self._lib.d2i_PKCS12_bio(bio.bio, self._ffi.NULL)
2275        if p12 == self._ffi.NULL:
2276            self._consume_errors()
2277            raise ValueError("Could not deserialize PKCS12 data")
2278
2279        p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
2280        evp_pkey_ptr = self._ffi.new("EVP_PKEY **")
2281        x509_ptr = self._ffi.new("X509 **")
2282        sk_x509_ptr = self._ffi.new("Cryptography_STACK_OF_X509 **")
2283        with self._zeroed_null_terminated_buf(password) as password_buf:
2284            res = self._lib.PKCS12_parse(
2285                p12, password_buf, evp_pkey_ptr, x509_ptr, sk_x509_ptr
2286            )
2287
2288        if res == 0:
2289            self._consume_errors()
2290            raise ValueError("Invalid password or PKCS12 data")
2291
2292        cert = None
2293        key = None
2294        additional_certificates = []
2295
2296        if evp_pkey_ptr[0] != self._ffi.NULL:
2297            evp_pkey = self._ffi.gc(evp_pkey_ptr[0], self._lib.EVP_PKEY_free)
2298            key = self._evp_pkey_to_private_key(evp_pkey)
2299
2300        if x509_ptr[0] != self._ffi.NULL:
2301            x509 = self._ffi.gc(x509_ptr[0], self._lib.X509_free)
2302            cert = _Certificate(self, x509)
2303
2304        if sk_x509_ptr[0] != self._ffi.NULL:
2305            sk_x509 = self._ffi.gc(sk_x509_ptr[0], self._lib.sk_X509_free)
2306            num = self._lib.sk_X509_num(sk_x509_ptr[0])
2307            for i in range(num):
2308                x509 = self._lib.sk_X509_value(sk_x509, i)
2309                x509 = self._ffi.gc(x509, self._lib.X509_free)
2310                self.openssl_assert(x509 != self._ffi.NULL)
2311                additional_certificates.append(_Certificate(self, x509))
2312
2313        return (key, cert, additional_certificates)
2314
2315
2316class GetCipherByName(object):
2317    def __init__(self, fmt):
2318        self._fmt = fmt
2319
2320    def __call__(self, backend, cipher, mode):
2321        cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower()
2322        return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))
2323
2324
2325def _get_xts_cipher(backend, cipher, mode):
2326    cipher_name = "aes-{0}-xts".format(cipher.key_size // 2)
2327    return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))
2328
2329
2330backend = Backend()
2331