• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import absolute_import, division, print_function
6
7import collections
8import contextlib
9import itertools
10import warnings
11from contextlib import contextmanager
12
13import six
14from six.moves import range
15
16from cryptography import utils, x509
17from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
18from cryptography.hazmat._der import (
19    INTEGER,
20    NULL,
21    SEQUENCE,
22    encode_der,
23    encode_der_integer,
24)
25from cryptography.hazmat.backends.interfaces import (
26    CMACBackend,
27    CipherBackend,
28    DERSerializationBackend,
29    DHBackend,
30    DSABackend,
31    EllipticCurveBackend,
32    HMACBackend,
33    HashBackend,
34    PBKDF2HMACBackend,
35    PEMSerializationBackend,
36    RSABackend,
37    ScryptBackend,
38    X509Backend,
39)
40from cryptography.hazmat.backends.openssl import aead
41from cryptography.hazmat.backends.openssl.ciphers import _CipherContext
42from cryptography.hazmat.backends.openssl.cmac import _CMACContext
43from cryptography.hazmat.backends.openssl.decode_asn1 import (
44    _CRL_ENTRY_REASON_ENUM_TO_CODE,
45    _CRL_EXTENSION_HANDLERS,
46    _EXTENSION_HANDLERS_BASE,
47    _EXTENSION_HANDLERS_SCT,
48    _OCSP_BASICRESP_EXTENSION_HANDLERS,
49    _OCSP_REQ_EXTENSION_HANDLERS,
50    _OCSP_SINGLERESP_EXTENSION_HANDLERS_SCT,
51    _REVOKED_EXTENSION_HANDLERS,
52    _X509ExtensionParser,
53)
54from cryptography.hazmat.backends.openssl.dh import (
55    _DHParameters,
56    _DHPrivateKey,
57    _DHPublicKey,
58    _dh_params_dup,
59)
60from cryptography.hazmat.backends.openssl.dsa import (
61    _DSAParameters,
62    _DSAPrivateKey,
63    _DSAPublicKey,
64)
65from cryptography.hazmat.backends.openssl.ec import (
66    _EllipticCurvePrivateKey,
67    _EllipticCurvePublicKey,
68)
69from cryptography.hazmat.backends.openssl.ed25519 import (
70    _Ed25519PrivateKey,
71    _Ed25519PublicKey,
72)
73from cryptography.hazmat.backends.openssl.ed448 import (
74    _ED448_KEY_SIZE,
75    _Ed448PrivateKey,
76    _Ed448PublicKey,
77)
78from cryptography.hazmat.backends.openssl.encode_asn1 import (
79    _CRL_ENTRY_EXTENSION_ENCODE_HANDLERS,
80    _CRL_EXTENSION_ENCODE_HANDLERS,
81    _EXTENSION_ENCODE_HANDLERS,
82    _OCSP_BASICRESP_EXTENSION_ENCODE_HANDLERS,
83    _OCSP_REQUEST_EXTENSION_ENCODE_HANDLERS,
84    _encode_asn1_int_gc,
85    _encode_asn1_str_gc,
86    _encode_name_gc,
87    _txt2obj_gc,
88)
89from cryptography.hazmat.backends.openssl.hashes import _HashContext
90from cryptography.hazmat.backends.openssl.hmac import _HMACContext
91from cryptography.hazmat.backends.openssl.ocsp import (
92    _OCSPRequest,
93    _OCSPResponse,
94)
95from cryptography.hazmat.backends.openssl.poly1305 import (
96    _POLY1305_KEY_SIZE,
97    _Poly1305Context,
98)
99from cryptography.hazmat.backends.openssl.rsa import (
100    _RSAPrivateKey,
101    _RSAPublicKey,
102)
103from cryptography.hazmat.backends.openssl.x25519 import (
104    _X25519PrivateKey,
105    _X25519PublicKey,
106)
107from cryptography.hazmat.backends.openssl.x448 import (
108    _X448PrivateKey,
109    _X448PublicKey,
110)
111from cryptography.hazmat.backends.openssl.x509 import (
112    _Certificate,
113    _CertificateRevocationList,
114    _CertificateSigningRequest,
115    _RevokedCertificate,
116)
117from cryptography.hazmat.bindings.openssl import binding
118from cryptography.hazmat.primitives import hashes, serialization
119from cryptography.hazmat.primitives.asymmetric import (
120    dh,
121    dsa,
122    ec,
123    ed25519,
124    ed448,
125    rsa,
126)
127from cryptography.hazmat.primitives.asymmetric.padding import (
128    MGF1,
129    OAEP,
130    PKCS1v15,
131    PSS,
132)
133from cryptography.hazmat.primitives.ciphers.algorithms import (
134    AES,
135    ARC4,
136    Blowfish,
137    CAST5,
138    Camellia,
139    ChaCha20,
140    IDEA,
141    SEED,
142    TripleDES,
143)
144from cryptography.hazmat.primitives.ciphers.modes import (
145    CBC,
146    CFB,
147    CFB8,
148    CTR,
149    ECB,
150    GCM,
151    OFB,
152    XTS,
153)
154from cryptography.hazmat.primitives.kdf import scrypt
155from cryptography.hazmat.primitives.serialization import pkcs7, ssh
156from cryptography.x509 import ocsp
157
158
159_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"])
160
161
162# Not actually supported, just used as a marker for some serialization tests.
163class _RC2(object):
164    pass
165
166
167@utils.register_interface(CipherBackend)
168@utils.register_interface(CMACBackend)
169@utils.register_interface(DERSerializationBackend)
170@utils.register_interface(DHBackend)
171@utils.register_interface(DSABackend)
172@utils.register_interface(EllipticCurveBackend)
173@utils.register_interface(HashBackend)
174@utils.register_interface(HMACBackend)
175@utils.register_interface(PBKDF2HMACBackend)
176@utils.register_interface(RSABackend)
177@utils.register_interface(PEMSerializationBackend)
178@utils.register_interface(X509Backend)
179@utils.register_interface_if(
180    binding.Binding().lib.Cryptography_HAS_SCRYPT, ScryptBackend
181)
182class Backend(object):
183    """
184    OpenSSL API binding interfaces.
185    """
186
187    name = "openssl"
188
189    # FIPS has opinions about acceptable algorithms and key sizes, but the
190    # disallowed algorithms are still present in OpenSSL. They just error if
191    # you try to use them. To avoid that we allowlist the algorithms in
192    # FIPS 140-3. This isn't ideal, but FIPS 140-3 is trash so here we are.
193    _fips_aead = {
194        b"aes-128-ccm",
195        b"aes-192-ccm",
196        b"aes-256-ccm",
197        b"aes-128-gcm",
198        b"aes-192-gcm",
199        b"aes-256-gcm",
200    }
201    _fips_ciphers = (AES, TripleDES)
202    _fips_hashes = (
203        hashes.SHA1,
204        hashes.SHA224,
205        hashes.SHA256,
206        hashes.SHA384,
207        hashes.SHA512,
208        hashes.SHA512_224,
209        hashes.SHA512_256,
210        hashes.SHA3_224,
211        hashes.SHA3_256,
212        hashes.SHA3_384,
213        hashes.SHA3_512,
214        hashes.SHAKE128,
215        hashes.SHAKE256,
216    )
217    _fips_rsa_min_key_size = 2048
218    _fips_rsa_min_public_exponent = 65537
219    _fips_dsa_min_modulus = 1 << 2048
220    _fips_dh_min_key_size = 2048
221    _fips_dh_min_modulus = 1 << _fips_dh_min_key_size
222
223    def __init__(self):
224        self._binding = binding.Binding()
225        self._ffi = self._binding.ffi
226        self._lib = self._binding.lib
227        self._fips_enabled = self._is_fips_enabled()
228
229        self._cipher_registry = {}
230        self._register_default_ciphers()
231        self._register_x509_ext_parsers()
232        self._register_x509_encoders()
233        if self._fips_enabled and self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
234            warnings.warn(
235                "OpenSSL FIPS mode is enabled. Can't enable DRBG fork safety.",
236                UserWarning,
237            )
238        else:
239            self.activate_osrandom_engine()
240        self._dh_types = [self._lib.EVP_PKEY_DH]
241        if self._lib.Cryptography_HAS_EVP_PKEY_DHX:
242            self._dh_types.append(self._lib.EVP_PKEY_DHX)
243
244    def openssl_assert(self, ok, errors=None):
245        return binding._openssl_assert(self._lib, ok, errors=errors)
246
247    def _is_fips_enabled(self):
248        fips_mode = getattr(self._lib, "FIPS_mode", lambda: 0)
249        mode = fips_mode()
250        if mode == 0:
251            # OpenSSL without FIPS pushes an error on the error stack
252            self._lib.ERR_clear_error()
253        return bool(mode)
254
255    def activate_builtin_random(self):
256        if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
257            # Obtain a new structural reference.
258            e = self._lib.ENGINE_get_default_RAND()
259            if e != self._ffi.NULL:
260                self._lib.ENGINE_unregister_RAND(e)
261                # Reset the RNG to use the built-in.
262                res = self._lib.RAND_set_rand_method(self._ffi.NULL)
263                self.openssl_assert(res == 1)
264                # decrement the structural reference from get_default_RAND
265                res = self._lib.ENGINE_finish(e)
266                self.openssl_assert(res == 1)
267
268    @contextlib.contextmanager
269    def _get_osurandom_engine(self):
270        # Fetches an engine by id and returns it. This creates a structural
271        # reference.
272        e = self._lib.ENGINE_by_id(self._lib.Cryptography_osrandom_engine_id)
273        self.openssl_assert(e != self._ffi.NULL)
274        # Initialize the engine for use. This adds a functional reference.
275        res = self._lib.ENGINE_init(e)
276        self.openssl_assert(res == 1)
277
278        try:
279            yield e
280        finally:
281            # Decrement the structural ref incremented by ENGINE_by_id.
282            res = self._lib.ENGINE_free(e)
283            self.openssl_assert(res == 1)
284            # Decrement the functional ref incremented by ENGINE_init.
285            res = self._lib.ENGINE_finish(e)
286            self.openssl_assert(res == 1)
287
288    def activate_osrandom_engine(self):
289        if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
290            # Unregister and free the current engine.
291            self.activate_builtin_random()
292            with self._get_osurandom_engine() as e:
293                # Set the engine as the default RAND provider.
294                res = self._lib.ENGINE_set_default_RAND(e)
295                self.openssl_assert(res == 1)
296            # Reset the RNG to use the engine
297            res = self._lib.RAND_set_rand_method(self._ffi.NULL)
298            self.openssl_assert(res == 1)
299
300    def osrandom_engine_implementation(self):
301        buf = self._ffi.new("char[]", 64)
302        with self._get_osurandom_engine() as e:
303            res = self._lib.ENGINE_ctrl_cmd(
304                e, b"get_implementation", len(buf), buf, self._ffi.NULL, 0
305            )
306            self.openssl_assert(res > 0)
307        return self._ffi.string(buf).decode("ascii")
308
309    def openssl_version_text(self):
310        """
311        Friendly string name of the loaded OpenSSL library. This is not
312        necessarily the same version as it was compiled against.
313
314        Example: OpenSSL 1.1.1d  10 Sep 2019
315        """
316        return self._ffi.string(
317            self._lib.OpenSSL_version(self._lib.OPENSSL_VERSION)
318        ).decode("ascii")
319
320    def openssl_version_number(self):
321        return self._lib.OpenSSL_version_num()
322
323    def create_hmac_ctx(self, key, algorithm):
324        return _HMACContext(self, key, algorithm)
325
326    def _evp_md_from_algorithm(self, algorithm):
327        if algorithm.name == "blake2b" or algorithm.name == "blake2s":
328            alg = "{}{}".format(
329                algorithm.name, algorithm.digest_size * 8
330            ).encode("ascii")
331        else:
332            alg = algorithm.name.encode("ascii")
333
334        evp_md = self._lib.EVP_get_digestbyname(alg)
335        return evp_md
336
337    def _evp_md_non_null_from_algorithm(self, algorithm):
338        evp_md = self._evp_md_from_algorithm(algorithm)
339        self.openssl_assert(evp_md != self._ffi.NULL)
340        return evp_md
341
342    def hash_supported(self, algorithm):
343        if self._fips_enabled and not isinstance(algorithm, self._fips_hashes):
344            return False
345
346        evp_md = self._evp_md_from_algorithm(algorithm)
347        return evp_md != self._ffi.NULL
348
349    def hmac_supported(self, algorithm):
350        return self.hash_supported(algorithm)
351
352    def create_hash_ctx(self, algorithm):
353        return _HashContext(self, algorithm)
354
355    def cipher_supported(self, cipher, mode):
356        if self._fips_enabled and not isinstance(cipher, self._fips_ciphers):
357            return False
358        try:
359            adapter = self._cipher_registry[type(cipher), type(mode)]
360        except KeyError:
361            return False
362        evp_cipher = adapter(self, cipher, mode)
363        return self._ffi.NULL != evp_cipher
364
365    def register_cipher_adapter(self, cipher_cls, mode_cls, adapter):
366        if (cipher_cls, mode_cls) in self._cipher_registry:
367            raise ValueError(
368                "Duplicate registration for: {} {}.".format(
369                    cipher_cls, mode_cls
370                )
371            )
372        self._cipher_registry[cipher_cls, mode_cls] = adapter
373
374    def _register_default_ciphers(self):
375        for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]:
376            self.register_cipher_adapter(
377                AES,
378                mode_cls,
379                GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"),
380            )
381        for mode_cls in [CBC, CTR, ECB, OFB, CFB]:
382            self.register_cipher_adapter(
383                Camellia,
384                mode_cls,
385                GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"),
386            )
387        for mode_cls in [CBC, CFB, CFB8, OFB]:
388            self.register_cipher_adapter(
389                TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}")
390            )
391        self.register_cipher_adapter(
392            TripleDES, ECB, GetCipherByName("des-ede3")
393        )
394        for mode_cls in [CBC, CFB, OFB, ECB]:
395            self.register_cipher_adapter(
396                Blowfish, mode_cls, GetCipherByName("bf-{mode.name}")
397            )
398        for mode_cls in [CBC, CFB, OFB, ECB]:
399            self.register_cipher_adapter(
400                SEED, mode_cls, GetCipherByName("seed-{mode.name}")
401            )
402        for cipher_cls, mode_cls in itertools.product(
403            [CAST5, IDEA],
404            [CBC, OFB, CFB, ECB],
405        ):
406            self.register_cipher_adapter(
407                cipher_cls,
408                mode_cls,
409                GetCipherByName("{cipher.name}-{mode.name}"),
410            )
411        self.register_cipher_adapter(ARC4, type(None), GetCipherByName("rc4"))
412        # We don't actually support RC2, this is just used by some tests.
413        self.register_cipher_adapter(_RC2, type(None), GetCipherByName("rc2"))
414        self.register_cipher_adapter(
415            ChaCha20, type(None), GetCipherByName("chacha20")
416        )
417        self.register_cipher_adapter(AES, XTS, _get_xts_cipher)
418
419    def _register_x509_ext_parsers(self):
420        ext_handlers = _EXTENSION_HANDLERS_BASE.copy()
421        # All revoked extensions are valid single response extensions, see:
422        # https://tools.ietf.org/html/rfc6960#section-4.4.5
423        singleresp_handlers = _REVOKED_EXTENSION_HANDLERS.copy()
424
425        if self._lib.Cryptography_HAS_SCT:
426            ext_handlers.update(_EXTENSION_HANDLERS_SCT)
427            singleresp_handlers.update(_OCSP_SINGLERESP_EXTENSION_HANDLERS_SCT)
428
429        self._certificate_extension_parser = _X509ExtensionParser(
430            self,
431            ext_count=self._lib.X509_get_ext_count,
432            get_ext=self._lib.X509_get_ext,
433            handlers=ext_handlers,
434        )
435        self._csr_extension_parser = _X509ExtensionParser(
436            self,
437            ext_count=self._lib.sk_X509_EXTENSION_num,
438            get_ext=self._lib.sk_X509_EXTENSION_value,
439            handlers=ext_handlers,
440        )
441        self._revoked_cert_extension_parser = _X509ExtensionParser(
442            self,
443            ext_count=self._lib.X509_REVOKED_get_ext_count,
444            get_ext=self._lib.X509_REVOKED_get_ext,
445            handlers=_REVOKED_EXTENSION_HANDLERS,
446        )
447        self._crl_extension_parser = _X509ExtensionParser(
448            self,
449            ext_count=self._lib.X509_CRL_get_ext_count,
450            get_ext=self._lib.X509_CRL_get_ext,
451            handlers=_CRL_EXTENSION_HANDLERS,
452        )
453        self._ocsp_req_ext_parser = _X509ExtensionParser(
454            self,
455            ext_count=self._lib.OCSP_REQUEST_get_ext_count,
456            get_ext=self._lib.OCSP_REQUEST_get_ext,
457            handlers=_OCSP_REQ_EXTENSION_HANDLERS,
458        )
459        self._ocsp_basicresp_ext_parser = _X509ExtensionParser(
460            self,
461            ext_count=self._lib.OCSP_BASICRESP_get_ext_count,
462            get_ext=self._lib.OCSP_BASICRESP_get_ext,
463            handlers=_OCSP_BASICRESP_EXTENSION_HANDLERS,
464        )
465        self._ocsp_singleresp_ext_parser = _X509ExtensionParser(
466            self,
467            ext_count=self._lib.OCSP_SINGLERESP_get_ext_count,
468            get_ext=self._lib.OCSP_SINGLERESP_get_ext,
469            handlers=singleresp_handlers,
470        )
471
472    def _register_x509_encoders(self):
473        self._extension_encode_handlers = _EXTENSION_ENCODE_HANDLERS.copy()
474        self._crl_extension_encode_handlers = (
475            _CRL_EXTENSION_ENCODE_HANDLERS.copy()
476        )
477        self._crl_entry_extension_encode_handlers = (
478            _CRL_ENTRY_EXTENSION_ENCODE_HANDLERS.copy()
479        )
480        self._ocsp_request_extension_encode_handlers = (
481            _OCSP_REQUEST_EXTENSION_ENCODE_HANDLERS.copy()
482        )
483        self._ocsp_basicresp_extension_encode_handlers = (
484            _OCSP_BASICRESP_EXTENSION_ENCODE_HANDLERS.copy()
485        )
486
487    def create_symmetric_encryption_ctx(self, cipher, mode):
488        return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT)
489
490    def create_symmetric_decryption_ctx(self, cipher, mode):
491        return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT)
492
493    def pbkdf2_hmac_supported(self, algorithm):
494        return self.hmac_supported(algorithm)
495
496    def derive_pbkdf2_hmac(
497        self, algorithm, length, salt, iterations, key_material
498    ):
499        buf = self._ffi.new("unsigned char[]", length)
500        evp_md = self._evp_md_non_null_from_algorithm(algorithm)
501        key_material_ptr = self._ffi.from_buffer(key_material)
502        res = self._lib.PKCS5_PBKDF2_HMAC(
503            key_material_ptr,
504            len(key_material),
505            salt,
506            len(salt),
507            iterations,
508            evp_md,
509            length,
510            buf,
511        )
512        self.openssl_assert(res == 1)
513        return self._ffi.buffer(buf)[:]
514
515    def _consume_errors(self):
516        return binding._consume_errors(self._lib)
517
518    def _consume_errors_with_text(self):
519        return binding._consume_errors_with_text(self._lib)
520
521    def _bn_to_int(self, bn):
522        assert bn != self._ffi.NULL
523
524        if not six.PY2:
525            # Python 3 has constant time from_bytes, so use that.
526            bn_num_bytes = self._lib.BN_num_bytes(bn)
527            bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes)
528            bin_len = self._lib.BN_bn2bin(bn, bin_ptr)
529            # A zero length means the BN has value 0
530            self.openssl_assert(bin_len >= 0)
531            val = int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big")
532            if self._lib.BN_is_negative(bn):
533                val = -val
534            return val
535        else:
536            # Under Python 2 the best we can do is hex()
537            hex_cdata = self._lib.BN_bn2hex(bn)
538            self.openssl_assert(hex_cdata != self._ffi.NULL)
539            hex_str = self._ffi.string(hex_cdata)
540            self._lib.OPENSSL_free(hex_cdata)
541            return int(hex_str, 16)
542
543    def _int_to_bn(self, num, bn=None):
544        """
545        Converts a python integer to a BIGNUM. The returned BIGNUM will not
546        be garbage collected (to support adding them to structs that take
547        ownership of the object). Be sure to register it for GC if it will
548        be discarded after use.
549        """
550        assert bn is None or bn != self._ffi.NULL
551
552        if bn is None:
553            bn = self._ffi.NULL
554
555        if not six.PY2:
556            # Python 3 has constant time to_bytes, so use that.
557
558            binary = num.to_bytes(int(num.bit_length() / 8.0 + 1), "big")
559            bn_ptr = self._lib.BN_bin2bn(binary, len(binary), bn)
560            self.openssl_assert(bn_ptr != self._ffi.NULL)
561            return bn_ptr
562
563        else:
564            # Under Python 2 the best we can do is hex(), [2:] removes the 0x
565            # prefix.
566            hex_num = hex(num).rstrip("L")[2:].encode("ascii")
567            bn_ptr = self._ffi.new("BIGNUM **")
568            bn_ptr[0] = bn
569            res = self._lib.BN_hex2bn(bn_ptr, hex_num)
570            self.openssl_assert(res != 0)
571            self.openssl_assert(bn_ptr[0] != self._ffi.NULL)
572            return bn_ptr[0]
573
574    def generate_rsa_private_key(self, public_exponent, key_size):
575        rsa._verify_rsa_parameters(public_exponent, key_size)
576
577        rsa_cdata = self._lib.RSA_new()
578        self.openssl_assert(rsa_cdata != self._ffi.NULL)
579        rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
580
581        bn = self._int_to_bn(public_exponent)
582        bn = self._ffi.gc(bn, self._lib.BN_free)
583
584        res = self._lib.RSA_generate_key_ex(
585            rsa_cdata, key_size, bn, self._ffi.NULL
586        )
587        self.openssl_assert(res == 1)
588        evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
589
590        return _RSAPrivateKey(self, rsa_cdata, evp_pkey)
591
592    def generate_rsa_parameters_supported(self, public_exponent, key_size):
593        return (
594            public_exponent >= 3
595            and public_exponent & 1 != 0
596            and key_size >= 512
597        )
598
599    def load_rsa_private_numbers(self, numbers):
600        rsa._check_private_key_components(
601            numbers.p,
602            numbers.q,
603            numbers.d,
604            numbers.dmp1,
605            numbers.dmq1,
606            numbers.iqmp,
607            numbers.public_numbers.e,
608            numbers.public_numbers.n,
609        )
610        rsa_cdata = self._lib.RSA_new()
611        self.openssl_assert(rsa_cdata != self._ffi.NULL)
612        rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
613        p = self._int_to_bn(numbers.p)
614        q = self._int_to_bn(numbers.q)
615        d = self._int_to_bn(numbers.d)
616        dmp1 = self._int_to_bn(numbers.dmp1)
617        dmq1 = self._int_to_bn(numbers.dmq1)
618        iqmp = self._int_to_bn(numbers.iqmp)
619        e = self._int_to_bn(numbers.public_numbers.e)
620        n = self._int_to_bn(numbers.public_numbers.n)
621        res = self._lib.RSA_set0_factors(rsa_cdata, p, q)
622        self.openssl_assert(res == 1)
623        res = self._lib.RSA_set0_key(rsa_cdata, n, e, d)
624        self.openssl_assert(res == 1)
625        res = self._lib.RSA_set0_crt_params(rsa_cdata, dmp1, dmq1, iqmp)
626        self.openssl_assert(res == 1)
627        evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
628
629        return _RSAPrivateKey(self, rsa_cdata, evp_pkey)
630
631    def load_rsa_public_numbers(self, numbers):
632        rsa._check_public_key_components(numbers.e, numbers.n)
633        rsa_cdata = self._lib.RSA_new()
634        self.openssl_assert(rsa_cdata != self._ffi.NULL)
635        rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
636        e = self._int_to_bn(numbers.e)
637        n = self._int_to_bn(numbers.n)
638        res = self._lib.RSA_set0_key(rsa_cdata, n, e, self._ffi.NULL)
639        self.openssl_assert(res == 1)
640        evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
641
642        return _RSAPublicKey(self, rsa_cdata, evp_pkey)
643
644    def _create_evp_pkey_gc(self):
645        evp_pkey = self._lib.EVP_PKEY_new()
646        self.openssl_assert(evp_pkey != self._ffi.NULL)
647        evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
648        return evp_pkey
649
650    def _rsa_cdata_to_evp_pkey(self, rsa_cdata):
651        evp_pkey = self._create_evp_pkey_gc()
652        res = self._lib.EVP_PKEY_set1_RSA(evp_pkey, rsa_cdata)
653        self.openssl_assert(res == 1)
654        return evp_pkey
655
656    def _bytes_to_bio(self, data):
657        """
658        Return a _MemoryBIO namedtuple of (BIO, char*).
659
660        The char* is the storage for the BIO and it must stay alive until the
661        BIO is finished with.
662        """
663        data_ptr = self._ffi.from_buffer(data)
664        bio = self._lib.BIO_new_mem_buf(data_ptr, len(data))
665        self.openssl_assert(bio != self._ffi.NULL)
666
667        return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_ptr)
668
669    def _create_mem_bio_gc(self):
670        """
671        Creates an empty memory BIO.
672        """
673        bio_method = self._lib.BIO_s_mem()
674        self.openssl_assert(bio_method != self._ffi.NULL)
675        bio = self._lib.BIO_new(bio_method)
676        self.openssl_assert(bio != self._ffi.NULL)
677        bio = self._ffi.gc(bio, self._lib.BIO_free)
678        return bio
679
680    def _read_mem_bio(self, bio):
681        """
682        Reads a memory BIO. This only works on memory BIOs.
683        """
684        buf = self._ffi.new("char **")
685        buf_len = self._lib.BIO_get_mem_data(bio, buf)
686        self.openssl_assert(buf_len > 0)
687        self.openssl_assert(buf[0] != self._ffi.NULL)
688        bio_data = self._ffi.buffer(buf[0], buf_len)[:]
689        return bio_data
690
691    def _evp_pkey_to_private_key(self, evp_pkey):
692        """
693        Return the appropriate type of PrivateKey given an evp_pkey cdata
694        pointer.
695        """
696
697        key_type = self._lib.EVP_PKEY_id(evp_pkey)
698
699        if key_type == self._lib.EVP_PKEY_RSA:
700            rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
701            self.openssl_assert(rsa_cdata != self._ffi.NULL)
702            rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
703            return _RSAPrivateKey(self, rsa_cdata, evp_pkey)
704        elif key_type == self._lib.EVP_PKEY_DSA:
705            dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
706            self.openssl_assert(dsa_cdata != self._ffi.NULL)
707            dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
708            return _DSAPrivateKey(self, dsa_cdata, evp_pkey)
709        elif key_type == self._lib.EVP_PKEY_EC:
710            ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
711            self.openssl_assert(ec_cdata != self._ffi.NULL)
712            ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
713            return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
714        elif key_type in self._dh_types:
715            dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey)
716            self.openssl_assert(dh_cdata != self._ffi.NULL)
717            dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
718            return _DHPrivateKey(self, dh_cdata, evp_pkey)
719        elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None):
720            # EVP_PKEY_ED25519 is not present in OpenSSL < 1.1.1
721            return _Ed25519PrivateKey(self, evp_pkey)
722        elif key_type == getattr(self._lib, "EVP_PKEY_X448", None):
723            # EVP_PKEY_X448 is not present in OpenSSL < 1.1.1
724            return _X448PrivateKey(self, evp_pkey)
725        elif key_type == getattr(self._lib, "EVP_PKEY_X25519", None):
726            # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.0
727            return _X25519PrivateKey(self, evp_pkey)
728        elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None):
729            # EVP_PKEY_ED448 is not present in OpenSSL < 1.1.1
730            return _Ed448PrivateKey(self, evp_pkey)
731        else:
732            raise UnsupportedAlgorithm("Unsupported key type.")
733
734    def _evp_pkey_to_public_key(self, evp_pkey):
735        """
736        Return the appropriate type of PublicKey given an evp_pkey cdata
737        pointer.
738        """
739
740        key_type = self._lib.EVP_PKEY_id(evp_pkey)
741
742        if key_type == self._lib.EVP_PKEY_RSA:
743            rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
744            self.openssl_assert(rsa_cdata != self._ffi.NULL)
745            rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
746            return _RSAPublicKey(self, rsa_cdata, evp_pkey)
747        elif key_type == self._lib.EVP_PKEY_DSA:
748            dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
749            self.openssl_assert(dsa_cdata != self._ffi.NULL)
750            dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
751            return _DSAPublicKey(self, dsa_cdata, evp_pkey)
752        elif key_type == self._lib.EVP_PKEY_EC:
753            ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
754            self.openssl_assert(ec_cdata != self._ffi.NULL)
755            ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
756            return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
757        elif key_type in self._dh_types:
758            dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey)
759            self.openssl_assert(dh_cdata != self._ffi.NULL)
760            dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
761            return _DHPublicKey(self, dh_cdata, evp_pkey)
762        elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None):
763            # EVP_PKEY_ED25519 is not present in OpenSSL < 1.1.1
764            return _Ed25519PublicKey(self, evp_pkey)
765        elif key_type == getattr(self._lib, "EVP_PKEY_X448", None):
766            # EVP_PKEY_X448 is not present in OpenSSL < 1.1.1
767            return _X448PublicKey(self, evp_pkey)
768        elif key_type == getattr(self._lib, "EVP_PKEY_X25519", None):
769            # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.0
770            return _X25519PublicKey(self, evp_pkey)
771        elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None):
772            # EVP_PKEY_X25519 is not present in OpenSSL < 1.1.1
773            return _Ed448PublicKey(self, evp_pkey)
774        else:
775            raise UnsupportedAlgorithm("Unsupported key type.")
776
777    def _oaep_hash_supported(self, algorithm):
778        if self._lib.Cryptography_HAS_RSA_OAEP_MD:
779            return isinstance(
780                algorithm,
781                (
782                    hashes.SHA1,
783                    hashes.SHA224,
784                    hashes.SHA256,
785                    hashes.SHA384,
786                    hashes.SHA512,
787                ),
788            )
789        else:
790            return isinstance(algorithm, hashes.SHA1)
791
792    def rsa_padding_supported(self, padding):
793        if isinstance(padding, PKCS1v15):
794            return True
795        elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1):
796            return self.hash_supported(padding._mgf._algorithm)
797        elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1):
798            return (
799                self._oaep_hash_supported(padding._mgf._algorithm)
800                and self._oaep_hash_supported(padding._algorithm)
801                and (
802                    (padding._label is None or len(padding._label) == 0)
803                    or self._lib.Cryptography_HAS_RSA_OAEP_LABEL == 1
804                )
805            )
806        else:
807            return False
808
809    def generate_dsa_parameters(self, key_size):
810        if key_size not in (1024, 2048, 3072, 4096):
811            raise ValueError(
812                "Key size must be 1024, 2048, 3072, or 4096 bits."
813            )
814
815        ctx = self._lib.DSA_new()
816        self.openssl_assert(ctx != self._ffi.NULL)
817        ctx = self._ffi.gc(ctx, self._lib.DSA_free)
818
819        res = self._lib.DSA_generate_parameters_ex(
820            ctx,
821            key_size,
822            self._ffi.NULL,
823            0,
824            self._ffi.NULL,
825            self._ffi.NULL,
826            self._ffi.NULL,
827        )
828
829        self.openssl_assert(res == 1)
830
831        return _DSAParameters(self, ctx)
832
833    def generate_dsa_private_key(self, parameters):
834        ctx = self._lib.DSAparams_dup(parameters._dsa_cdata)
835        self.openssl_assert(ctx != self._ffi.NULL)
836        ctx = self._ffi.gc(ctx, self._lib.DSA_free)
837        self._lib.DSA_generate_key(ctx)
838        evp_pkey = self._dsa_cdata_to_evp_pkey(ctx)
839
840        return _DSAPrivateKey(self, ctx, evp_pkey)
841
842    def generate_dsa_private_key_and_parameters(self, key_size):
843        parameters = self.generate_dsa_parameters(key_size)
844        return self.generate_dsa_private_key(parameters)
845
846    def _dsa_cdata_set_values(self, dsa_cdata, p, q, g, pub_key, priv_key):
847        res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g)
848        self.openssl_assert(res == 1)
849        res = self._lib.DSA_set0_key(dsa_cdata, pub_key, priv_key)
850        self.openssl_assert(res == 1)
851
852    def load_dsa_private_numbers(self, numbers):
853        dsa._check_dsa_private_numbers(numbers)
854        parameter_numbers = numbers.public_numbers.parameter_numbers
855
856        dsa_cdata = self._lib.DSA_new()
857        self.openssl_assert(dsa_cdata != self._ffi.NULL)
858        dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
859
860        p = self._int_to_bn(parameter_numbers.p)
861        q = self._int_to_bn(parameter_numbers.q)
862        g = self._int_to_bn(parameter_numbers.g)
863        pub_key = self._int_to_bn(numbers.public_numbers.y)
864        priv_key = self._int_to_bn(numbers.x)
865        self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key)
866
867        evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata)
868
869        return _DSAPrivateKey(self, dsa_cdata, evp_pkey)
870
871    def load_dsa_public_numbers(self, numbers):
872        dsa._check_dsa_parameters(numbers.parameter_numbers)
873        dsa_cdata = self._lib.DSA_new()
874        self.openssl_assert(dsa_cdata != self._ffi.NULL)
875        dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
876
877        p = self._int_to_bn(numbers.parameter_numbers.p)
878        q = self._int_to_bn(numbers.parameter_numbers.q)
879        g = self._int_to_bn(numbers.parameter_numbers.g)
880        pub_key = self._int_to_bn(numbers.y)
881        priv_key = self._ffi.NULL
882        self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key)
883
884        evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata)
885
886        return _DSAPublicKey(self, dsa_cdata, evp_pkey)
887
888    def load_dsa_parameter_numbers(self, numbers):
889        dsa._check_dsa_parameters(numbers)
890        dsa_cdata = self._lib.DSA_new()
891        self.openssl_assert(dsa_cdata != self._ffi.NULL)
892        dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
893
894        p = self._int_to_bn(numbers.p)
895        q = self._int_to_bn(numbers.q)
896        g = self._int_to_bn(numbers.g)
897        res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g)
898        self.openssl_assert(res == 1)
899
900        return _DSAParameters(self, dsa_cdata)
901
902    def _dsa_cdata_to_evp_pkey(self, dsa_cdata):
903        evp_pkey = self._create_evp_pkey_gc()
904        res = self._lib.EVP_PKEY_set1_DSA(evp_pkey, dsa_cdata)
905        self.openssl_assert(res == 1)
906        return evp_pkey
907
908    def dsa_hash_supported(self, algorithm):
909        return self.hash_supported(algorithm)
910
911    def dsa_parameters_supported(self, p, q, g):
912        return True
913
914    def cmac_algorithm_supported(self, algorithm):
915        return self.cipher_supported(
916            algorithm, CBC(b"\x00" * algorithm.block_size)
917        )
918
919    def create_cmac_ctx(self, algorithm):
920        return _CMACContext(self, algorithm)
921
922    def _x509_check_signature_params(self, private_key, algorithm):
923        if isinstance(
924            private_key, (ed25519.Ed25519PrivateKey, ed448.Ed448PrivateKey)
925        ):
926            if algorithm is not None:
927                raise ValueError(
928                    "algorithm must be None when signing via ed25519 or ed448"
929                )
930        elif not isinstance(
931            private_key,
932            (rsa.RSAPrivateKey, dsa.DSAPrivateKey, ec.EllipticCurvePrivateKey),
933        ):
934            raise TypeError(
935                "Key must be an rsa, dsa, ec, ed25519, or ed448 private key."
936            )
937        elif not isinstance(algorithm, hashes.HashAlgorithm):
938            raise TypeError("Algorithm must be a registered hash algorithm.")
939        elif isinstance(algorithm, hashes.MD5) and not isinstance(
940            private_key, rsa.RSAPrivateKey
941        ):
942            raise ValueError(
943                "MD5 hash algorithm is only supported with RSA keys"
944            )
945
946    def create_x509_csr(self, builder, private_key, algorithm):
947        if not isinstance(builder, x509.CertificateSigningRequestBuilder):
948            raise TypeError("Builder type mismatch.")
949        self._x509_check_signature_params(private_key, algorithm)
950
951        # Resolve the signature algorithm.
952        evp_md = self._evp_md_x509_null_if_eddsa(private_key, algorithm)
953
954        # Create an empty request.
955        x509_req = self._lib.X509_REQ_new()
956        self.openssl_assert(x509_req != self._ffi.NULL)
957        x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free)
958
959        # Set x509 version.
960        res = self._lib.X509_REQ_set_version(x509_req, x509.Version.v1.value)
961        self.openssl_assert(res == 1)
962
963        # Set subject name.
964        res = self._lib.X509_REQ_set_subject_name(
965            x509_req, _encode_name_gc(self, builder._subject_name)
966        )
967        self.openssl_assert(res == 1)
968
969        # Set subject public key.
970        public_key = private_key.public_key()
971        res = self._lib.X509_REQ_set_pubkey(x509_req, public_key._evp_pkey)
972        self.openssl_assert(res == 1)
973
974        # Add extensions.
975        sk_extension = self._lib.sk_X509_EXTENSION_new_null()
976        self.openssl_assert(sk_extension != self._ffi.NULL)
977        sk_extension = self._ffi.gc(
978            sk_extension,
979            lambda x: self._lib.sk_X509_EXTENSION_pop_free(
980                x,
981                self._ffi.addressof(
982                    self._lib._original_lib, "X509_EXTENSION_free"
983                ),
984            ),
985        )
986        # Don't GC individual extensions because the memory is owned by
987        # sk_extensions and will be freed along with it.
988        self._create_x509_extensions(
989            extensions=builder._extensions,
990            handlers=self._extension_encode_handlers,
991            x509_obj=sk_extension,
992            add_func=self._lib.sk_X509_EXTENSION_insert,
993            gc=False,
994        )
995        res = self._lib.X509_REQ_add_extensions(x509_req, sk_extension)
996        self.openssl_assert(res == 1)
997
998        # Add attributes (all bytes encoded as ASN1 UTF8_STRING)
999        for attr_oid, attr_val in builder._attributes:
1000            obj = _txt2obj_gc(self, attr_oid.dotted_string)
1001            res = self._lib.X509_REQ_add1_attr_by_OBJ(
1002                x509_req,
1003                obj,
1004                x509.name._ASN1Type.UTF8String.value,
1005                attr_val,
1006                len(attr_val),
1007            )
1008            self.openssl_assert(res == 1)
1009
1010        # Sign the request using the requester's private key.
1011        res = self._lib.X509_REQ_sign(x509_req, private_key._evp_pkey, evp_md)
1012        if res == 0:
1013            errors = self._consume_errors_with_text()
1014            raise ValueError("Signing failed", errors)
1015
1016        return _CertificateSigningRequest(self, x509_req)
1017
1018    def create_x509_certificate(self, builder, private_key, algorithm):
1019        if not isinstance(builder, x509.CertificateBuilder):
1020            raise TypeError("Builder type mismatch.")
1021        self._x509_check_signature_params(private_key, algorithm)
1022
1023        # Resolve the signature algorithm.
1024        evp_md = self._evp_md_x509_null_if_eddsa(private_key, algorithm)
1025
1026        # Create an empty certificate.
1027        x509_cert = self._lib.X509_new()
1028        x509_cert = self._ffi.gc(x509_cert, self._lib.X509_free)
1029
1030        # Set the x509 version.
1031        res = self._lib.X509_set_version(x509_cert, builder._version.value)
1032        self.openssl_assert(res == 1)
1033
1034        # Set the subject's name.
1035        res = self._lib.X509_set_subject_name(
1036            x509_cert, _encode_name_gc(self, builder._subject_name)
1037        )
1038        self.openssl_assert(res == 1)
1039
1040        # Set the subject's public key.
1041        res = self._lib.X509_set_pubkey(
1042            x509_cert, builder._public_key._evp_pkey
1043        )
1044        self.openssl_assert(res == 1)
1045
1046        # Set the certificate serial number.
1047        serial_number = _encode_asn1_int_gc(self, builder._serial_number)
1048        res = self._lib.X509_set_serialNumber(x509_cert, serial_number)
1049        self.openssl_assert(res == 1)
1050
1051        # Set the "not before" time.
1052        self._set_asn1_time(
1053            self._lib.X509_getm_notBefore(x509_cert), builder._not_valid_before
1054        )
1055
1056        # Set the "not after" time.
1057        self._set_asn1_time(
1058            self._lib.X509_getm_notAfter(x509_cert), builder._not_valid_after
1059        )
1060
1061        # Add extensions.
1062        self._create_x509_extensions(
1063            extensions=builder._extensions,
1064            handlers=self._extension_encode_handlers,
1065            x509_obj=x509_cert,
1066            add_func=self._lib.X509_add_ext,
1067            gc=True,
1068        )
1069
1070        # Set the issuer name.
1071        res = self._lib.X509_set_issuer_name(
1072            x509_cert, _encode_name_gc(self, builder._issuer_name)
1073        )
1074        self.openssl_assert(res == 1)
1075
1076        # Sign the certificate with the issuer's private key.
1077        res = self._lib.X509_sign(x509_cert, private_key._evp_pkey, evp_md)
1078        if res == 0:
1079            errors = self._consume_errors_with_text()
1080            raise ValueError("Signing failed", errors)
1081
1082        return _Certificate(self, x509_cert)
1083
1084    def _evp_md_x509_null_if_eddsa(self, private_key, algorithm):
1085        if isinstance(
1086            private_key, (ed25519.Ed25519PrivateKey, ed448.Ed448PrivateKey)
1087        ):
1088            # OpenSSL requires us to pass NULL for EVP_MD for ed25519/ed448
1089            return self._ffi.NULL
1090        else:
1091            return self._evp_md_non_null_from_algorithm(algorithm)
1092
1093    def _set_asn1_time(self, asn1_time, time):
1094        if time.year >= 2050:
1095            asn1_str = time.strftime("%Y%m%d%H%M%SZ").encode("ascii")
1096        else:
1097            asn1_str = time.strftime("%y%m%d%H%M%SZ").encode("ascii")
1098        res = self._lib.ASN1_TIME_set_string(asn1_time, asn1_str)
1099        self.openssl_assert(res == 1)
1100
1101    def _create_asn1_time(self, time):
1102        asn1_time = self._lib.ASN1_TIME_new()
1103        self.openssl_assert(asn1_time != self._ffi.NULL)
1104        asn1_time = self._ffi.gc(asn1_time, self._lib.ASN1_TIME_free)
1105        self._set_asn1_time(asn1_time, time)
1106        return asn1_time
1107
1108    def create_x509_crl(self, builder, private_key, algorithm):
1109        if not isinstance(builder, x509.CertificateRevocationListBuilder):
1110            raise TypeError("Builder type mismatch.")
1111        self._x509_check_signature_params(private_key, algorithm)
1112
1113        evp_md = self._evp_md_x509_null_if_eddsa(private_key, algorithm)
1114
1115        # Create an empty CRL.
1116        x509_crl = self._lib.X509_CRL_new()
1117        x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free)
1118
1119        # Set the x509 CRL version. We only support v2 (integer value 1).
1120        res = self._lib.X509_CRL_set_version(x509_crl, 1)
1121        self.openssl_assert(res == 1)
1122
1123        # Set the issuer name.
1124        res = self._lib.X509_CRL_set_issuer_name(
1125            x509_crl, _encode_name_gc(self, builder._issuer_name)
1126        )
1127        self.openssl_assert(res == 1)
1128
1129        # Set the last update time.
1130        last_update = self._create_asn1_time(builder._last_update)
1131        res = self._lib.X509_CRL_set_lastUpdate(x509_crl, last_update)
1132        self.openssl_assert(res == 1)
1133
1134        # Set the next update time.
1135        next_update = self._create_asn1_time(builder._next_update)
1136        res = self._lib.X509_CRL_set_nextUpdate(x509_crl, next_update)
1137        self.openssl_assert(res == 1)
1138
1139        # Add extensions.
1140        self._create_x509_extensions(
1141            extensions=builder._extensions,
1142            handlers=self._crl_extension_encode_handlers,
1143            x509_obj=x509_crl,
1144            add_func=self._lib.X509_CRL_add_ext,
1145            gc=True,
1146        )
1147
1148        # add revoked certificates
1149        for revoked_cert in builder._revoked_certificates:
1150            # Duplicating because the X509_CRL takes ownership and will free
1151            # this memory when X509_CRL_free is called.
1152            revoked = self._lib.X509_REVOKED_dup(revoked_cert._x509_revoked)
1153            self.openssl_assert(revoked != self._ffi.NULL)
1154            res = self._lib.X509_CRL_add0_revoked(x509_crl, revoked)
1155            self.openssl_assert(res == 1)
1156
1157        res = self._lib.X509_CRL_sign(x509_crl, private_key._evp_pkey, evp_md)
1158        if res == 0:
1159            errors = self._consume_errors_with_text()
1160            raise ValueError("Signing failed", errors)
1161
1162        return _CertificateRevocationList(self, x509_crl)
1163
1164    def _create_x509_extensions(
1165        self, extensions, handlers, x509_obj, add_func, gc
1166    ):
1167        for i, extension in enumerate(extensions):
1168            x509_extension = self._create_x509_extension(handlers, extension)
1169            self.openssl_assert(x509_extension != self._ffi.NULL)
1170
1171            if gc:
1172                x509_extension = self._ffi.gc(
1173                    x509_extension, self._lib.X509_EXTENSION_free
1174                )
1175            res = add_func(x509_obj, x509_extension, i)
1176            self.openssl_assert(res >= 1)
1177
1178    def _create_raw_x509_extension(self, extension, value):
1179        obj = _txt2obj_gc(self, extension.oid.dotted_string)
1180        return self._lib.X509_EXTENSION_create_by_OBJ(
1181            self._ffi.NULL, obj, 1 if extension.critical else 0, value
1182        )
1183
1184    def _create_x509_extension(self, handlers, extension):
1185        if isinstance(extension.value, x509.UnrecognizedExtension):
1186            value = _encode_asn1_str_gc(self, extension.value.value)
1187            return self._create_raw_x509_extension(extension, value)
1188        elif isinstance(extension.value, x509.TLSFeature):
1189            asn1 = encode_der(
1190                SEQUENCE,
1191                *[
1192                    encode_der(INTEGER, encode_der_integer(x.value))
1193                    for x in extension.value
1194                ]
1195            )
1196            value = _encode_asn1_str_gc(self, asn1)
1197            return self._create_raw_x509_extension(extension, value)
1198        elif isinstance(extension.value, x509.PrecertPoison):
1199            value = _encode_asn1_str_gc(self, encode_der(NULL))
1200            return self._create_raw_x509_extension(extension, value)
1201        else:
1202            try:
1203                encode = handlers[extension.oid]
1204            except KeyError:
1205                raise NotImplementedError(
1206                    "Extension not supported: {}".format(extension.oid)
1207                )
1208
1209            ext_struct = encode(self, extension.value)
1210            nid = self._lib.OBJ_txt2nid(
1211                extension.oid.dotted_string.encode("ascii")
1212            )
1213            self.openssl_assert(nid != self._lib.NID_undef)
1214            return self._lib.X509V3_EXT_i2d(
1215                nid, 1 if extension.critical else 0, ext_struct
1216            )
1217
1218    def create_x509_revoked_certificate(self, builder):
1219        if not isinstance(builder, x509.RevokedCertificateBuilder):
1220            raise TypeError("Builder type mismatch.")
1221
1222        x509_revoked = self._lib.X509_REVOKED_new()
1223        self.openssl_assert(x509_revoked != self._ffi.NULL)
1224        x509_revoked = self._ffi.gc(x509_revoked, self._lib.X509_REVOKED_free)
1225        serial_number = _encode_asn1_int_gc(self, builder._serial_number)
1226        res = self._lib.X509_REVOKED_set_serialNumber(
1227            x509_revoked, serial_number
1228        )
1229        self.openssl_assert(res == 1)
1230        rev_date = self._create_asn1_time(builder._revocation_date)
1231        res = self._lib.X509_REVOKED_set_revocationDate(x509_revoked, rev_date)
1232        self.openssl_assert(res == 1)
1233        # add CRL entry extensions
1234        self._create_x509_extensions(
1235            extensions=builder._extensions,
1236            handlers=self._crl_entry_extension_encode_handlers,
1237            x509_obj=x509_revoked,
1238            add_func=self._lib.X509_REVOKED_add_ext,
1239            gc=True,
1240        )
1241        return _RevokedCertificate(self, None, x509_revoked)
1242
1243    def load_pem_private_key(self, data, password):
1244        return self._load_key(
1245            self._lib.PEM_read_bio_PrivateKey,
1246            self._evp_pkey_to_private_key,
1247            data,
1248            password,
1249        )
1250
1251    def load_pem_public_key(self, data):
1252        mem_bio = self._bytes_to_bio(data)
1253        evp_pkey = self._lib.PEM_read_bio_PUBKEY(
1254            mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
1255        )
1256        if evp_pkey != self._ffi.NULL:
1257            evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1258            return self._evp_pkey_to_public_key(evp_pkey)
1259        else:
1260            # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still
1261            # need to check to see if it is a pure PKCS1 RSA public key (not
1262            # embedded in a subjectPublicKeyInfo)
1263            self._consume_errors()
1264            res = self._lib.BIO_reset(mem_bio.bio)
1265            self.openssl_assert(res == 1)
1266            rsa_cdata = self._lib.PEM_read_bio_RSAPublicKey(
1267                mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
1268            )
1269            if rsa_cdata != self._ffi.NULL:
1270                rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
1271                evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
1272                return _RSAPublicKey(self, rsa_cdata, evp_pkey)
1273            else:
1274                self._handle_key_loading_error()
1275
1276    def load_pem_parameters(self, data):
1277        mem_bio = self._bytes_to_bio(data)
1278        # only DH is supported currently
1279        dh_cdata = self._lib.PEM_read_bio_DHparams(
1280            mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
1281        )
1282        if dh_cdata != self._ffi.NULL:
1283            dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1284            return _DHParameters(self, dh_cdata)
1285        else:
1286            self._handle_key_loading_error()
1287
1288    def load_der_private_key(self, data, password):
1289        # OpenSSL has a function called d2i_AutoPrivateKey that in theory
1290        # handles this automatically, however it doesn't handle encrypted
1291        # private keys. Instead we try to load the key two different ways.
1292        # First we'll try to load it as a traditional key.
1293        bio_data = self._bytes_to_bio(data)
1294        key = self._evp_pkey_from_der_traditional_key(bio_data, password)
1295        if key:
1296            return self._evp_pkey_to_private_key(key)
1297        else:
1298            # Finally we try to load it with the method that handles encrypted
1299            # PKCS8 properly.
1300            return self._load_key(
1301                self._lib.d2i_PKCS8PrivateKey_bio,
1302                self._evp_pkey_to_private_key,
1303                data,
1304                password,
1305            )
1306
1307    def _evp_pkey_from_der_traditional_key(self, bio_data, password):
1308        key = self._lib.d2i_PrivateKey_bio(bio_data.bio, self._ffi.NULL)
1309        if key != self._ffi.NULL:
1310            key = self._ffi.gc(key, self._lib.EVP_PKEY_free)
1311            if password is not None:
1312                raise TypeError(
1313                    "Password was given but private key is not encrypted."
1314                )
1315
1316            return key
1317        else:
1318            self._consume_errors()
1319            return None
1320
1321    def load_der_public_key(self, data):
1322        mem_bio = self._bytes_to_bio(data)
1323        evp_pkey = self._lib.d2i_PUBKEY_bio(mem_bio.bio, self._ffi.NULL)
1324        if evp_pkey != self._ffi.NULL:
1325            evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1326            return self._evp_pkey_to_public_key(evp_pkey)
1327        else:
1328            # It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still
1329            # need to check to see if it is a pure PKCS1 RSA public key (not
1330            # embedded in a subjectPublicKeyInfo)
1331            self._consume_errors()
1332            res = self._lib.BIO_reset(mem_bio.bio)
1333            self.openssl_assert(res == 1)
1334            rsa_cdata = self._lib.d2i_RSAPublicKey_bio(
1335                mem_bio.bio, self._ffi.NULL
1336            )
1337            if rsa_cdata != self._ffi.NULL:
1338                rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
1339                evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
1340                return _RSAPublicKey(self, rsa_cdata, evp_pkey)
1341            else:
1342                self._handle_key_loading_error()
1343
1344    def load_der_parameters(self, data):
1345        mem_bio = self._bytes_to_bio(data)
1346        dh_cdata = self._lib.d2i_DHparams_bio(mem_bio.bio, self._ffi.NULL)
1347        if dh_cdata != self._ffi.NULL:
1348            dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1349            return _DHParameters(self, dh_cdata)
1350        elif self._lib.Cryptography_HAS_EVP_PKEY_DHX:
1351            # We check to see if the is dhx.
1352            self._consume_errors()
1353            res = self._lib.BIO_reset(mem_bio.bio)
1354            self.openssl_assert(res == 1)
1355            dh_cdata = self._lib.Cryptography_d2i_DHxparams_bio(
1356                mem_bio.bio, self._ffi.NULL
1357            )
1358            if dh_cdata != self._ffi.NULL:
1359                dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
1360                return _DHParameters(self, dh_cdata)
1361
1362        self._handle_key_loading_error()
1363
1364    def load_pem_x509_certificate(self, data):
1365        mem_bio = self._bytes_to_bio(data)
1366        x509 = self._lib.PEM_read_bio_X509(
1367            mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
1368        )
1369        if x509 == self._ffi.NULL:
1370            self._consume_errors()
1371            raise ValueError(
1372                "Unable to load certificate. See https://cryptography.io/en/"
1373                "latest/faq.html#why-can-t-i-import-my-pem-file for more"
1374                " details."
1375            )
1376
1377        x509 = self._ffi.gc(x509, self._lib.X509_free)
1378        return _Certificate(self, x509)
1379
1380    def load_der_x509_certificate(self, data):
1381        mem_bio = self._bytes_to_bio(data)
1382        x509 = self._lib.d2i_X509_bio(mem_bio.bio, self._ffi.NULL)
1383        if x509 == self._ffi.NULL:
1384            self._consume_errors()
1385            raise ValueError("Unable to load certificate")
1386
1387        x509 = self._ffi.gc(x509, self._lib.X509_free)
1388        return _Certificate(self, x509)
1389
1390    def load_pem_x509_crl(self, data):
1391        mem_bio = self._bytes_to_bio(data)
1392        x509_crl = self._lib.PEM_read_bio_X509_CRL(
1393            mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
1394        )
1395        if x509_crl == self._ffi.NULL:
1396            self._consume_errors()
1397            raise ValueError(
1398                "Unable to load CRL. See https://cryptography.io/en/la"
1399                "test/faq.html#why-can-t-i-import-my-pem-file for more"
1400                " details."
1401            )
1402
1403        x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free)
1404        return _CertificateRevocationList(self, x509_crl)
1405
1406    def load_der_x509_crl(self, data):
1407        mem_bio = self._bytes_to_bio(data)
1408        x509_crl = self._lib.d2i_X509_CRL_bio(mem_bio.bio, self._ffi.NULL)
1409        if x509_crl == self._ffi.NULL:
1410            self._consume_errors()
1411            raise ValueError("Unable to load CRL")
1412
1413        x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free)
1414        return _CertificateRevocationList(self, x509_crl)
1415
1416    def load_pem_x509_csr(self, data):
1417        mem_bio = self._bytes_to_bio(data)
1418        x509_req = self._lib.PEM_read_bio_X509_REQ(
1419            mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
1420        )
1421        if x509_req == self._ffi.NULL:
1422            self._consume_errors()
1423            raise ValueError(
1424                "Unable to load request. See https://cryptography.io/en/"
1425                "latest/faq.html#why-can-t-i-import-my-pem-file for more"
1426                " details."
1427            )
1428
1429        x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free)
1430        return _CertificateSigningRequest(self, x509_req)
1431
1432    def load_der_x509_csr(self, data):
1433        mem_bio = self._bytes_to_bio(data)
1434        x509_req = self._lib.d2i_X509_REQ_bio(mem_bio.bio, self._ffi.NULL)
1435        if x509_req == self._ffi.NULL:
1436            self._consume_errors()
1437            raise ValueError("Unable to load request")
1438
1439        x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free)
1440        return _CertificateSigningRequest(self, x509_req)
1441
1442    def _load_key(self, openssl_read_func, convert_func, data, password):
1443        mem_bio = self._bytes_to_bio(data)
1444
1445        userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *")
1446        if password is not None:
1447            utils._check_byteslike("password", password)
1448            password_ptr = self._ffi.from_buffer(password)
1449            userdata.password = password_ptr
1450            userdata.length = len(password)
1451
1452        evp_pkey = openssl_read_func(
1453            mem_bio.bio,
1454            self._ffi.NULL,
1455            self._ffi.addressof(
1456                self._lib._original_lib, "Cryptography_pem_password_cb"
1457            ),
1458            userdata,
1459        )
1460
1461        if evp_pkey == self._ffi.NULL:
1462            if userdata.error != 0:
1463                self._consume_errors()
1464                if userdata.error == -1:
1465                    raise TypeError(
1466                        "Password was not given but private key is encrypted"
1467                    )
1468                else:
1469                    assert userdata.error == -2
1470                    raise ValueError(
1471                        "Passwords longer than {} bytes are not supported "
1472                        "by this backend.".format(userdata.maxsize - 1)
1473                    )
1474            else:
1475                self._handle_key_loading_error()
1476
1477        evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
1478
1479        if password is not None and userdata.called == 0:
1480            raise TypeError(
1481                "Password was given but private key is not encrypted."
1482            )
1483
1484        assert (
1485            password is not None and userdata.called == 1
1486        ) or password is None
1487
1488        return convert_func(evp_pkey)
1489
1490    def _handle_key_loading_error(self):
1491        errors = self._consume_errors()
1492
1493        if not errors:
1494            raise ValueError(
1495                "Could not deserialize key data. The data may be in an "
1496                "incorrect format or it may be encrypted with an unsupported "
1497                "algorithm."
1498            )
1499        elif errors[0]._lib_reason_match(
1500            self._lib.ERR_LIB_EVP, self._lib.EVP_R_BAD_DECRYPT
1501        ) or errors[0]._lib_reason_match(
1502            self._lib.ERR_LIB_PKCS12,
1503            self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR,
1504        ):
1505            raise ValueError("Bad decrypt. Incorrect password?")
1506
1507        elif any(
1508            error._lib_reason_match(
1509                self._lib.ERR_LIB_EVP,
1510                self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM,
1511            )
1512            for error in errors
1513        ):
1514            raise ValueError("Unsupported public key algorithm.")
1515
1516        else:
1517            raise ValueError(
1518                "Could not deserialize key data. The data may be in an "
1519                "incorrect format or it may be encrypted with an unsupported "
1520                "algorithm."
1521            )
1522
1523    def elliptic_curve_supported(self, curve):
1524        try:
1525            curve_nid = self._elliptic_curve_to_nid(curve)
1526        except UnsupportedAlgorithm:
1527            curve_nid = self._lib.NID_undef
1528
1529        group = self._lib.EC_GROUP_new_by_curve_name(curve_nid)
1530
1531        if group == self._ffi.NULL:
1532            self._consume_errors()
1533            return False
1534        else:
1535            self.openssl_assert(curve_nid != self._lib.NID_undef)
1536            self._lib.EC_GROUP_free(group)
1537            return True
1538
1539    def elliptic_curve_signature_algorithm_supported(
1540        self, signature_algorithm, curve
1541    ):
1542        # We only support ECDSA right now.
1543        if not isinstance(signature_algorithm, ec.ECDSA):
1544            return False
1545
1546        return self.elliptic_curve_supported(curve)
1547
1548    def generate_elliptic_curve_private_key(self, curve):
1549        """
1550        Generate a new private key on the named curve.
1551        """
1552
1553        if self.elliptic_curve_supported(curve):
1554            ec_cdata = self._ec_key_new_by_curve(curve)
1555
1556            res = self._lib.EC_KEY_generate_key(ec_cdata)
1557            self.openssl_assert(res == 1)
1558
1559            evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1560
1561            return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
1562        else:
1563            raise UnsupportedAlgorithm(
1564                "Backend object does not support {}.".format(curve.name),
1565                _Reasons.UNSUPPORTED_ELLIPTIC_CURVE,
1566            )
1567
1568    def load_elliptic_curve_private_numbers(self, numbers):
1569        public = numbers.public_numbers
1570
1571        ec_cdata = self._ec_key_new_by_curve(public.curve)
1572
1573        private_value = self._ffi.gc(
1574            self._int_to_bn(numbers.private_value), self._lib.BN_clear_free
1575        )
1576        res = self._lib.EC_KEY_set_private_key(ec_cdata, private_value)
1577        self.openssl_assert(res == 1)
1578
1579        ec_cdata = self._ec_key_set_public_key_affine_coordinates(
1580            ec_cdata, public.x, public.y
1581        )
1582
1583        evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1584
1585        return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
1586
1587    def load_elliptic_curve_public_numbers(self, numbers):
1588        ec_cdata = self._ec_key_new_by_curve(numbers.curve)
1589        ec_cdata = self._ec_key_set_public_key_affine_coordinates(
1590            ec_cdata, numbers.x, numbers.y
1591        )
1592        evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1593
1594        return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
1595
1596    def load_elliptic_curve_public_bytes(self, curve, point_bytes):
1597        ec_cdata = self._ec_key_new_by_curve(curve)
1598        group = self._lib.EC_KEY_get0_group(ec_cdata)
1599        self.openssl_assert(group != self._ffi.NULL)
1600        point = self._lib.EC_POINT_new(group)
1601        self.openssl_assert(point != self._ffi.NULL)
1602        point = self._ffi.gc(point, self._lib.EC_POINT_free)
1603        with self._tmp_bn_ctx() as bn_ctx:
1604            res = self._lib.EC_POINT_oct2point(
1605                group, point, point_bytes, len(point_bytes), bn_ctx
1606            )
1607            if res != 1:
1608                self._consume_errors()
1609                raise ValueError("Invalid public bytes for the given curve")
1610
1611        res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
1612        self.openssl_assert(res == 1)
1613        evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1614        return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
1615
1616    def derive_elliptic_curve_private_key(self, private_value, curve):
1617        ec_cdata = self._ec_key_new_by_curve(curve)
1618
1619        get_func, group = self._ec_key_determine_group_get_func(ec_cdata)
1620
1621        point = self._lib.EC_POINT_new(group)
1622        self.openssl_assert(point != self._ffi.NULL)
1623        point = self._ffi.gc(point, self._lib.EC_POINT_free)
1624
1625        value = self._int_to_bn(private_value)
1626        value = self._ffi.gc(value, self._lib.BN_clear_free)
1627
1628        with self._tmp_bn_ctx() as bn_ctx:
1629            res = self._lib.EC_POINT_mul(
1630                group, point, value, self._ffi.NULL, self._ffi.NULL, bn_ctx
1631            )
1632            self.openssl_assert(res == 1)
1633
1634            bn_x = self._lib.BN_CTX_get(bn_ctx)
1635            bn_y = self._lib.BN_CTX_get(bn_ctx)
1636
1637            res = get_func(group, point, bn_x, bn_y, bn_ctx)
1638            self.openssl_assert(res == 1)
1639
1640        res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
1641        self.openssl_assert(res == 1)
1642        private = self._int_to_bn(private_value)
1643        private = self._ffi.gc(private, self._lib.BN_clear_free)
1644        res = self._lib.EC_KEY_set_private_key(ec_cdata, private)
1645        self.openssl_assert(res == 1)
1646
1647        evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
1648
1649        return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
1650
1651    def _ec_key_new_by_curve(self, curve):
1652        curve_nid = self._elliptic_curve_to_nid(curve)
1653        return self._ec_key_new_by_curve_nid(curve_nid)
1654
1655    def _ec_key_new_by_curve_nid(self, curve_nid):
1656        ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid)
1657        self.openssl_assert(ec_cdata != self._ffi.NULL)
1658        return self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
1659
1660    def load_der_ocsp_request(self, data):
1661        mem_bio = self._bytes_to_bio(data)
1662        request = self._lib.d2i_OCSP_REQUEST_bio(mem_bio.bio, self._ffi.NULL)
1663        if request == self._ffi.NULL:
1664            self._consume_errors()
1665            raise ValueError("Unable to load OCSP request")
1666
1667        request = self._ffi.gc(request, self._lib.OCSP_REQUEST_free)
1668        return _OCSPRequest(self, request)
1669
1670    def load_der_ocsp_response(self, data):
1671        mem_bio = self._bytes_to_bio(data)
1672        response = self._lib.d2i_OCSP_RESPONSE_bio(mem_bio.bio, self._ffi.NULL)
1673        if response == self._ffi.NULL:
1674            self._consume_errors()
1675            raise ValueError("Unable to load OCSP response")
1676
1677        response = self._ffi.gc(response, self._lib.OCSP_RESPONSE_free)
1678        return _OCSPResponse(self, response)
1679
1680    def create_ocsp_request(self, builder):
1681        ocsp_req = self._lib.OCSP_REQUEST_new()
1682        self.openssl_assert(ocsp_req != self._ffi.NULL)
1683        ocsp_req = self._ffi.gc(ocsp_req, self._lib.OCSP_REQUEST_free)
1684        cert, issuer, algorithm = builder._request
1685        evp_md = self._evp_md_non_null_from_algorithm(algorithm)
1686        certid = self._lib.OCSP_cert_to_id(evp_md, cert._x509, issuer._x509)
1687        self.openssl_assert(certid != self._ffi.NULL)
1688        onereq = self._lib.OCSP_request_add0_id(ocsp_req, certid)
1689        self.openssl_assert(onereq != self._ffi.NULL)
1690        self._create_x509_extensions(
1691            extensions=builder._extensions,
1692            handlers=self._ocsp_request_extension_encode_handlers,
1693            x509_obj=ocsp_req,
1694            add_func=self._lib.OCSP_REQUEST_add_ext,
1695            gc=True,
1696        )
1697        return _OCSPRequest(self, ocsp_req)
1698
1699    def _create_ocsp_basic_response(self, builder, private_key, algorithm):
1700        self._x509_check_signature_params(private_key, algorithm)
1701
1702        basic = self._lib.OCSP_BASICRESP_new()
1703        self.openssl_assert(basic != self._ffi.NULL)
1704        basic = self._ffi.gc(basic, self._lib.OCSP_BASICRESP_free)
1705        evp_md = self._evp_md_non_null_from_algorithm(
1706            builder._response._algorithm
1707        )
1708        certid = self._lib.OCSP_cert_to_id(
1709            evp_md,
1710            builder._response._cert._x509,
1711            builder._response._issuer._x509,
1712        )
1713        self.openssl_assert(certid != self._ffi.NULL)
1714        certid = self._ffi.gc(certid, self._lib.OCSP_CERTID_free)
1715        if builder._response._revocation_reason is None:
1716            reason = -1
1717        else:
1718            reason = _CRL_ENTRY_REASON_ENUM_TO_CODE[
1719                builder._response._revocation_reason
1720            ]
1721        if builder._response._revocation_time is None:
1722            rev_time = self._ffi.NULL
1723        else:
1724            rev_time = self._create_asn1_time(
1725                builder._response._revocation_time
1726            )
1727
1728        next_update = self._ffi.NULL
1729        if builder._response._next_update is not None:
1730            next_update = self._create_asn1_time(
1731                builder._response._next_update
1732            )
1733
1734        this_update = self._create_asn1_time(builder._response._this_update)
1735
1736        res = self._lib.OCSP_basic_add1_status(
1737            basic,
1738            certid,
1739            builder._response._cert_status.value,
1740            reason,
1741            rev_time,
1742            this_update,
1743            next_update,
1744        )
1745        self.openssl_assert(res != self._ffi.NULL)
1746        # okay, now sign the basic structure
1747        evp_md = self._evp_md_x509_null_if_eddsa(private_key, algorithm)
1748        responder_cert, responder_encoding = builder._responder_id
1749        flags = self._lib.OCSP_NOCERTS
1750        if responder_encoding is ocsp.OCSPResponderEncoding.HASH:
1751            flags |= self._lib.OCSP_RESPID_KEY
1752
1753        if builder._certs is not None:
1754            for cert in builder._certs:
1755                res = self._lib.OCSP_basic_add1_cert(basic, cert._x509)
1756                self.openssl_assert(res == 1)
1757
1758        self._create_x509_extensions(
1759            extensions=builder._extensions,
1760            handlers=self._ocsp_basicresp_extension_encode_handlers,
1761            x509_obj=basic,
1762            add_func=self._lib.OCSP_BASICRESP_add_ext,
1763            gc=True,
1764        )
1765
1766        res = self._lib.OCSP_basic_sign(
1767            basic,
1768            responder_cert._x509,
1769            private_key._evp_pkey,
1770            evp_md,
1771            self._ffi.NULL,
1772            flags,
1773        )
1774        if res != 1:
1775            errors = self._consume_errors_with_text()
1776            raise ValueError(
1777                "Error while signing. responder_cert must be signed "
1778                "by private_key",
1779                errors,
1780            )
1781
1782        return basic
1783
1784    def create_ocsp_response(
1785        self, response_status, builder, private_key, algorithm
1786    ):
1787        if response_status is ocsp.OCSPResponseStatus.SUCCESSFUL:
1788            basic = self._create_ocsp_basic_response(
1789                builder, private_key, algorithm
1790            )
1791        else:
1792            basic = self._ffi.NULL
1793
1794        ocsp_resp = self._lib.OCSP_response_create(
1795            response_status.value, basic
1796        )
1797        self.openssl_assert(ocsp_resp != self._ffi.NULL)
1798        ocsp_resp = self._ffi.gc(ocsp_resp, self._lib.OCSP_RESPONSE_free)
1799        return _OCSPResponse(self, ocsp_resp)
1800
1801    def elliptic_curve_exchange_algorithm_supported(self, algorithm, curve):
1802        return self.elliptic_curve_supported(curve) and isinstance(
1803            algorithm, ec.ECDH
1804        )
1805
1806    def _ec_cdata_to_evp_pkey(self, ec_cdata):
1807        evp_pkey = self._create_evp_pkey_gc()
1808        res = self._lib.EVP_PKEY_set1_EC_KEY(evp_pkey, ec_cdata)
1809        self.openssl_assert(res == 1)
1810        return evp_pkey
1811
1812    def _elliptic_curve_to_nid(self, curve):
1813        """
1814        Get the NID for a curve name.
1815        """
1816
1817        curve_aliases = {"secp192r1": "prime192v1", "secp256r1": "prime256v1"}
1818
1819        curve_name = curve_aliases.get(curve.name, curve.name)
1820
1821        curve_nid = self._lib.OBJ_sn2nid(curve_name.encode())
1822        if curve_nid == self._lib.NID_undef:
1823            raise UnsupportedAlgorithm(
1824                "{} is not a supported elliptic curve".format(curve.name),
1825                _Reasons.UNSUPPORTED_ELLIPTIC_CURVE,
1826            )
1827        return curve_nid
1828
1829    @contextmanager
1830    def _tmp_bn_ctx(self):
1831        bn_ctx = self._lib.BN_CTX_new()
1832        self.openssl_assert(bn_ctx != self._ffi.NULL)
1833        bn_ctx = self._ffi.gc(bn_ctx, self._lib.BN_CTX_free)
1834        self._lib.BN_CTX_start(bn_ctx)
1835        try:
1836            yield bn_ctx
1837        finally:
1838            self._lib.BN_CTX_end(bn_ctx)
1839
1840    def _ec_key_determine_group_get_func(self, ctx):
1841        """
1842        Given an EC_KEY determine the group and what function is required to
1843        get point coordinates.
1844        """
1845        self.openssl_assert(ctx != self._ffi.NULL)
1846
1847        nid_two_field = self._lib.OBJ_sn2nid(b"characteristic-two-field")
1848        self.openssl_assert(nid_two_field != self._lib.NID_undef)
1849
1850        group = self._lib.EC_KEY_get0_group(ctx)
1851        self.openssl_assert(group != self._ffi.NULL)
1852
1853        method = self._lib.EC_GROUP_method_of(group)
1854        self.openssl_assert(method != self._ffi.NULL)
1855
1856        nid = self._lib.EC_METHOD_get_field_type(method)
1857        self.openssl_assert(nid != self._lib.NID_undef)
1858
1859        if nid == nid_two_field and self._lib.Cryptography_HAS_EC2M:
1860            get_func = self._lib.EC_POINT_get_affine_coordinates_GF2m
1861        else:
1862            get_func = self._lib.EC_POINT_get_affine_coordinates_GFp
1863
1864        assert get_func
1865
1866        return get_func, group
1867
1868    def _ec_key_set_public_key_affine_coordinates(self, ctx, x, y):
1869        """
1870        Sets the public key point in the EC_KEY context to the affine x and y
1871        values.
1872        """
1873
1874        if x < 0 or y < 0:
1875            raise ValueError(
1876                "Invalid EC key. Both x and y must be non-negative."
1877            )
1878
1879        x = self._ffi.gc(self._int_to_bn(x), self._lib.BN_free)
1880        y = self._ffi.gc(self._int_to_bn(y), self._lib.BN_free)
1881        res = self._lib.EC_KEY_set_public_key_affine_coordinates(ctx, x, y)
1882        if res != 1:
1883            self._consume_errors()
1884            raise ValueError("Invalid EC key.")
1885
1886        return ctx
1887
1888    def _private_key_bytes(
1889        self, encoding, format, encryption_algorithm, key, evp_pkey, cdata
1890    ):
1891        # validate argument types
1892        if not isinstance(encoding, serialization.Encoding):
1893            raise TypeError("encoding must be an item from the Encoding enum")
1894        if not isinstance(format, serialization.PrivateFormat):
1895            raise TypeError(
1896                "format must be an item from the PrivateFormat enum"
1897            )
1898        if not isinstance(
1899            encryption_algorithm, serialization.KeySerializationEncryption
1900        ):
1901            raise TypeError(
1902                "Encryption algorithm must be a KeySerializationEncryption "
1903                "instance"
1904            )
1905
1906        # validate password
1907        if isinstance(encryption_algorithm, serialization.NoEncryption):
1908            password = b""
1909        elif isinstance(
1910            encryption_algorithm, serialization.BestAvailableEncryption
1911        ):
1912            password = encryption_algorithm.password
1913            if len(password) > 1023:
1914                raise ValueError(
1915                    "Passwords longer than 1023 bytes are not supported by "
1916                    "this backend"
1917                )
1918        else:
1919            raise ValueError("Unsupported encryption type")
1920
1921        # PKCS8 + PEM/DER
1922        if format is serialization.PrivateFormat.PKCS8:
1923            if encoding is serialization.Encoding.PEM:
1924                write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey
1925            elif encoding is serialization.Encoding.DER:
1926                write_bio = self._lib.i2d_PKCS8PrivateKey_bio
1927            else:
1928                raise ValueError("Unsupported encoding for PKCS8")
1929            return self._private_key_bytes_via_bio(
1930                write_bio, evp_pkey, password
1931            )
1932
1933        # TraditionalOpenSSL + PEM/DER
1934        if format is serialization.PrivateFormat.TraditionalOpenSSL:
1935            if self._fips_enabled and not isinstance(
1936                encryption_algorithm, serialization.NoEncryption
1937            ):
1938                raise ValueError(
1939                    "Encrypted traditional OpenSSL format is not "
1940                    "supported in FIPS mode."
1941                )
1942            key_type = self._lib.EVP_PKEY_id(evp_pkey)
1943
1944            if encoding is serialization.Encoding.PEM:
1945                if key_type == self._lib.EVP_PKEY_RSA:
1946                    write_bio = self._lib.PEM_write_bio_RSAPrivateKey
1947                elif key_type == self._lib.EVP_PKEY_DSA:
1948                    write_bio = self._lib.PEM_write_bio_DSAPrivateKey
1949                elif key_type == self._lib.EVP_PKEY_EC:
1950                    write_bio = self._lib.PEM_write_bio_ECPrivateKey
1951                else:
1952                    raise ValueError(
1953                        "Unsupported key type for TraditionalOpenSSL"
1954                    )
1955                return self._private_key_bytes_via_bio(
1956                    write_bio, cdata, password
1957                )
1958
1959            if encoding is serialization.Encoding.DER:
1960                if password:
1961                    raise ValueError(
1962                        "Encryption is not supported for DER encoded "
1963                        "traditional OpenSSL keys"
1964                    )
1965                if key_type == self._lib.EVP_PKEY_RSA:
1966                    write_bio = self._lib.i2d_RSAPrivateKey_bio
1967                elif key_type == self._lib.EVP_PKEY_EC:
1968                    write_bio = self._lib.i2d_ECPrivateKey_bio
1969                elif key_type == self._lib.EVP_PKEY_DSA:
1970                    write_bio = self._lib.i2d_DSAPrivateKey_bio
1971                else:
1972                    raise ValueError(
1973                        "Unsupported key type for TraditionalOpenSSL"
1974                    )
1975                return self._bio_func_output(write_bio, cdata)
1976
1977            raise ValueError("Unsupported encoding for TraditionalOpenSSL")
1978
1979        # OpenSSH + PEM
1980        if format is serialization.PrivateFormat.OpenSSH:
1981            if encoding is serialization.Encoding.PEM:
1982                return ssh.serialize_ssh_private_key(key, password)
1983
1984            raise ValueError(
1985                "OpenSSH private key format can only be used"
1986                " with PEM encoding"
1987            )
1988
1989        # Anything that key-specific code was supposed to handle earlier,
1990        # like Raw.
1991        raise ValueError("format is invalid with this key")
1992
1993    def _private_key_bytes_via_bio(self, write_bio, evp_pkey, password):
1994        if not password:
1995            evp_cipher = self._ffi.NULL
1996        else:
1997            # This is a curated value that we will update over time.
1998            evp_cipher = self._lib.EVP_get_cipherbyname(b"aes-256-cbc")
1999
2000        return self._bio_func_output(
2001            write_bio,
2002            evp_pkey,
2003            evp_cipher,
2004            password,
2005            len(password),
2006            self._ffi.NULL,
2007            self._ffi.NULL,
2008        )
2009
2010    def _bio_func_output(self, write_bio, *args):
2011        bio = self._create_mem_bio_gc()
2012        res = write_bio(bio, *args)
2013        self.openssl_assert(res == 1)
2014        return self._read_mem_bio(bio)
2015
2016    def _public_key_bytes(self, encoding, format, key, evp_pkey, cdata):
2017        if not isinstance(encoding, serialization.Encoding):
2018            raise TypeError("encoding must be an item from the Encoding enum")
2019        if not isinstance(format, serialization.PublicFormat):
2020            raise TypeError(
2021                "format must be an item from the PublicFormat enum"
2022            )
2023
2024        # SubjectPublicKeyInfo + PEM/DER
2025        if format is serialization.PublicFormat.SubjectPublicKeyInfo:
2026            if encoding is serialization.Encoding.PEM:
2027                write_bio = self._lib.PEM_write_bio_PUBKEY
2028            elif encoding is serialization.Encoding.DER:
2029                write_bio = self._lib.i2d_PUBKEY_bio
2030            else:
2031                raise ValueError(
2032                    "SubjectPublicKeyInfo works only with PEM or DER encoding"
2033                )
2034            return self._bio_func_output(write_bio, evp_pkey)
2035
2036        # PKCS1 + PEM/DER
2037        if format is serialization.PublicFormat.PKCS1:
2038            # Only RSA is supported here.
2039            key_type = self._lib.EVP_PKEY_id(evp_pkey)
2040            if key_type != self._lib.EVP_PKEY_RSA:
2041                raise ValueError("PKCS1 format is supported only for RSA keys")
2042
2043            if encoding is serialization.Encoding.PEM:
2044                write_bio = self._lib.PEM_write_bio_RSAPublicKey
2045            elif encoding is serialization.Encoding.DER:
2046                write_bio = self._lib.i2d_RSAPublicKey_bio
2047            else:
2048                raise ValueError("PKCS1 works only with PEM or DER encoding")
2049            return self._bio_func_output(write_bio, cdata)
2050
2051        # OpenSSH + OpenSSH
2052        if format is serialization.PublicFormat.OpenSSH:
2053            if encoding is serialization.Encoding.OpenSSH:
2054                return ssh.serialize_ssh_public_key(key)
2055
2056            raise ValueError(
2057                "OpenSSH format must be used with OpenSSH encoding"
2058            )
2059
2060        # Anything that key-specific code was supposed to handle earlier,
2061        # like Raw, CompressedPoint, UncompressedPoint
2062        raise ValueError("format is invalid with this key")
2063
2064    def _parameter_bytes(self, encoding, format, cdata):
2065        if encoding is serialization.Encoding.OpenSSH:
2066            raise TypeError("OpenSSH encoding is not supported")
2067
2068        # Only DH is supported here currently.
2069        q = self._ffi.new("BIGNUM **")
2070        self._lib.DH_get0_pqg(cdata, self._ffi.NULL, q, self._ffi.NULL)
2071        if encoding is serialization.Encoding.PEM:
2072            if q[0] != self._ffi.NULL:
2073                write_bio = self._lib.PEM_write_bio_DHxparams
2074            else:
2075                write_bio = self._lib.PEM_write_bio_DHparams
2076        elif encoding is serialization.Encoding.DER:
2077            if q[0] != self._ffi.NULL:
2078                write_bio = self._lib.Cryptography_i2d_DHxparams_bio
2079            else:
2080                write_bio = self._lib.i2d_DHparams_bio
2081        else:
2082            raise TypeError("encoding must be an item from the Encoding enum")
2083
2084        bio = self._create_mem_bio_gc()
2085        res = write_bio(bio, cdata)
2086        self.openssl_assert(res == 1)
2087        return self._read_mem_bio(bio)
2088
2089    def generate_dh_parameters(self, generator, key_size):
2090        if key_size < dh._MIN_MODULUS_SIZE:
2091            raise ValueError(
2092                "DH key_size must be at least {} bits".format(
2093                    dh._MIN_MODULUS_SIZE
2094                )
2095            )
2096
2097        if generator not in (2, 5):
2098            raise ValueError("DH generator must be 2 or 5")
2099
2100        dh_param_cdata = self._lib.DH_new()
2101        self.openssl_assert(dh_param_cdata != self._ffi.NULL)
2102        dh_param_cdata = self._ffi.gc(dh_param_cdata, self._lib.DH_free)
2103
2104        res = self._lib.DH_generate_parameters_ex(
2105            dh_param_cdata, key_size, generator, self._ffi.NULL
2106        )
2107        self.openssl_assert(res == 1)
2108
2109        return _DHParameters(self, dh_param_cdata)
2110
2111    def _dh_cdata_to_evp_pkey(self, dh_cdata):
2112        evp_pkey = self._create_evp_pkey_gc()
2113        res = self._lib.EVP_PKEY_set1_DH(evp_pkey, dh_cdata)
2114        self.openssl_assert(res == 1)
2115        return evp_pkey
2116
2117    def generate_dh_private_key(self, parameters):
2118        dh_key_cdata = _dh_params_dup(parameters._dh_cdata, self)
2119
2120        res = self._lib.DH_generate_key(dh_key_cdata)
2121        self.openssl_assert(res == 1)
2122
2123        evp_pkey = self._dh_cdata_to_evp_pkey(dh_key_cdata)
2124
2125        return _DHPrivateKey(self, dh_key_cdata, evp_pkey)
2126
2127    def generate_dh_private_key_and_parameters(self, generator, key_size):
2128        return self.generate_dh_private_key(
2129            self.generate_dh_parameters(generator, key_size)
2130        )
2131
2132    def load_dh_private_numbers(self, numbers):
2133        parameter_numbers = numbers.public_numbers.parameter_numbers
2134
2135        dh_cdata = self._lib.DH_new()
2136        self.openssl_assert(dh_cdata != self._ffi.NULL)
2137        dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
2138
2139        p = self._int_to_bn(parameter_numbers.p)
2140        g = self._int_to_bn(parameter_numbers.g)
2141
2142        if parameter_numbers.q is not None:
2143            q = self._int_to_bn(parameter_numbers.q)
2144        else:
2145            q = self._ffi.NULL
2146
2147        pub_key = self._int_to_bn(numbers.public_numbers.y)
2148        priv_key = self._int_to_bn(numbers.x)
2149
2150        res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
2151        self.openssl_assert(res == 1)
2152
2153        res = self._lib.DH_set0_key(dh_cdata, pub_key, priv_key)
2154        self.openssl_assert(res == 1)
2155
2156        codes = self._ffi.new("int[]", 1)
2157        res = self._lib.Cryptography_DH_check(dh_cdata, codes)
2158        self.openssl_assert(res == 1)
2159
2160        # DH_check will return DH_NOT_SUITABLE_GENERATOR if p % 24 does not
2161        # equal 11 when the generator is 2 (a quadratic nonresidue).
2162        # We want to ignore that error because p % 24 == 23 is also fine.
2163        # Specifically, g is then a quadratic residue. Within the context of
2164        # Diffie-Hellman this means it can only generate half the possible
2165        # values. That sounds bad, but quadratic nonresidues leak a bit of
2166        # the key to the attacker in exchange for having the full key space
2167        # available. See: https://crypto.stackexchange.com/questions/12961
2168        if codes[0] != 0 and not (
2169            parameter_numbers.g == 2
2170            and codes[0] ^ self._lib.DH_NOT_SUITABLE_GENERATOR == 0
2171        ):
2172            raise ValueError("DH private numbers did not pass safety checks.")
2173
2174        evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata)
2175
2176        return _DHPrivateKey(self, dh_cdata, evp_pkey)
2177
2178    def load_dh_public_numbers(self, numbers):
2179        dh_cdata = self._lib.DH_new()
2180        self.openssl_assert(dh_cdata != self._ffi.NULL)
2181        dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
2182
2183        parameter_numbers = numbers.parameter_numbers
2184
2185        p = self._int_to_bn(parameter_numbers.p)
2186        g = self._int_to_bn(parameter_numbers.g)
2187
2188        if parameter_numbers.q is not None:
2189            q = self._int_to_bn(parameter_numbers.q)
2190        else:
2191            q = self._ffi.NULL
2192
2193        pub_key = self._int_to_bn(numbers.y)
2194
2195        res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
2196        self.openssl_assert(res == 1)
2197
2198        res = self._lib.DH_set0_key(dh_cdata, pub_key, self._ffi.NULL)
2199        self.openssl_assert(res == 1)
2200
2201        evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata)
2202
2203        return _DHPublicKey(self, dh_cdata, evp_pkey)
2204
2205    def load_dh_parameter_numbers(self, numbers):
2206        dh_cdata = self._lib.DH_new()
2207        self.openssl_assert(dh_cdata != self._ffi.NULL)
2208        dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
2209
2210        p = self._int_to_bn(numbers.p)
2211        g = self._int_to_bn(numbers.g)
2212
2213        if numbers.q is not None:
2214            q = self._int_to_bn(numbers.q)
2215        else:
2216            q = self._ffi.NULL
2217
2218        res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
2219        self.openssl_assert(res == 1)
2220
2221        return _DHParameters(self, dh_cdata)
2222
2223    def dh_parameters_supported(self, p, g, q=None):
2224        dh_cdata = self._lib.DH_new()
2225        self.openssl_assert(dh_cdata != self._ffi.NULL)
2226        dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
2227
2228        p = self._int_to_bn(p)
2229        g = self._int_to_bn(g)
2230
2231        if q is not None:
2232            q = self._int_to_bn(q)
2233        else:
2234            q = self._ffi.NULL
2235
2236        res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
2237        self.openssl_assert(res == 1)
2238
2239        codes = self._ffi.new("int[]", 1)
2240        res = self._lib.Cryptography_DH_check(dh_cdata, codes)
2241        self.openssl_assert(res == 1)
2242
2243        return codes[0] == 0
2244
2245    def dh_x942_serialization_supported(self):
2246        return self._lib.Cryptography_HAS_EVP_PKEY_DHX == 1
2247
2248    def x509_name_bytes(self, name):
2249        x509_name = _encode_name_gc(self, name)
2250        pp = self._ffi.new("unsigned char **")
2251        res = self._lib.i2d_X509_NAME(x509_name, pp)
2252        self.openssl_assert(pp[0] != self._ffi.NULL)
2253        pp = self._ffi.gc(
2254            pp, lambda pointer: self._lib.OPENSSL_free(pointer[0])
2255        )
2256        self.openssl_assert(res > 0)
2257        return self._ffi.buffer(pp[0], res)[:]
2258
2259    def x25519_load_public_bytes(self, data):
2260        # When we drop support for CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 we can
2261        # switch this to EVP_PKEY_new_raw_public_key
2262        if len(data) != 32:
2263            raise ValueError("An X25519 public key is 32 bytes long")
2264
2265        evp_pkey = self._create_evp_pkey_gc()
2266        res = self._lib.EVP_PKEY_set_type(evp_pkey, self._lib.NID_X25519)
2267        self.openssl_assert(res == 1)
2268        res = self._lib.EVP_PKEY_set1_tls_encodedpoint(
2269            evp_pkey, data, len(data)
2270        )
2271        self.openssl_assert(res == 1)
2272        return _X25519PublicKey(self, evp_pkey)
2273
2274    def x25519_load_private_bytes(self, data):
2275        # When we drop support for CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 we can
2276        # switch this to EVP_PKEY_new_raw_private_key and drop the
2277        # zeroed_bytearray garbage.
2278        # OpenSSL only has facilities for loading PKCS8 formatted private
2279        # keys using the algorithm identifiers specified in
2280        # https://tools.ietf.org/html/draft-ietf-curdle-pkix-09.
2281        # This is the standard PKCS8 prefix for a 32 byte X25519 key.
2282        # The form is:
2283        #    0:d=0  hl=2 l=  46 cons: SEQUENCE
2284        #    2:d=1  hl=2 l=   1 prim: INTEGER           :00
2285        #    5:d=1  hl=2 l=   5 cons: SEQUENCE
2286        #    7:d=2  hl=2 l=   3 prim: OBJECT            :1.3.101.110
2287        #    12:d=1  hl=2 l=  34 prim: OCTET STRING      (the key)
2288        # Of course there's a bit more complexity. In reality OCTET STRING
2289        # contains an OCTET STRING of length 32! So the last two bytes here
2290        # are \x04\x20, which is an OCTET STRING of length 32.
2291        if len(data) != 32:
2292            raise ValueError("An X25519 private key is 32 bytes long")
2293
2294        pkcs8_prefix = b'0.\x02\x01\x000\x05\x06\x03+en\x04"\x04 '
2295        with self._zeroed_bytearray(48) as ba:
2296            ba[0:16] = pkcs8_prefix
2297            ba[16:] = data
2298            bio = self._bytes_to_bio(ba)
2299            evp_pkey = self._lib.d2i_PrivateKey_bio(bio.bio, self._ffi.NULL)
2300
2301        self.openssl_assert(evp_pkey != self._ffi.NULL)
2302        evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
2303        self.openssl_assert(
2304            self._lib.EVP_PKEY_id(evp_pkey) == self._lib.EVP_PKEY_X25519
2305        )
2306        return _X25519PrivateKey(self, evp_pkey)
2307
2308    def _evp_pkey_keygen_gc(self, nid):
2309        evp_pkey_ctx = self._lib.EVP_PKEY_CTX_new_id(nid, self._ffi.NULL)
2310        self.openssl_assert(evp_pkey_ctx != self._ffi.NULL)
2311        evp_pkey_ctx = self._ffi.gc(evp_pkey_ctx, self._lib.EVP_PKEY_CTX_free)
2312        res = self._lib.EVP_PKEY_keygen_init(evp_pkey_ctx)
2313        self.openssl_assert(res == 1)
2314        evp_ppkey = self._ffi.new("EVP_PKEY **")
2315        res = self._lib.EVP_PKEY_keygen(evp_pkey_ctx, evp_ppkey)
2316        self.openssl_assert(res == 1)
2317        self.openssl_assert(evp_ppkey[0] != self._ffi.NULL)
2318        evp_pkey = self._ffi.gc(evp_ppkey[0], self._lib.EVP_PKEY_free)
2319        return evp_pkey
2320
2321    def x25519_generate_key(self):
2322        evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X25519)
2323        return _X25519PrivateKey(self, evp_pkey)
2324
2325    def x25519_supported(self):
2326        if self._fips_enabled:
2327            return False
2328        return not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
2329
2330    def x448_load_public_bytes(self, data):
2331        if len(data) != 56:
2332            raise ValueError("An X448 public key is 56 bytes long")
2333
2334        evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
2335            self._lib.NID_X448, self._ffi.NULL, data, len(data)
2336        )
2337        self.openssl_assert(evp_pkey != self._ffi.NULL)
2338        evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
2339        return _X448PublicKey(self, evp_pkey)
2340
2341    def x448_load_private_bytes(self, data):
2342        if len(data) != 56:
2343            raise ValueError("An X448 private key is 56 bytes long")
2344
2345        data_ptr = self._ffi.from_buffer(data)
2346        evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
2347            self._lib.NID_X448, self._ffi.NULL, data_ptr, len(data)
2348        )
2349        self.openssl_assert(evp_pkey != self._ffi.NULL)
2350        evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
2351        return _X448PrivateKey(self, evp_pkey)
2352
2353    def x448_generate_key(self):
2354        evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X448)
2355        return _X448PrivateKey(self, evp_pkey)
2356
2357    def x448_supported(self):
2358        if self._fips_enabled:
2359            return False
2360        return not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111
2361
2362    def ed25519_supported(self):
2363        if self._fips_enabled:
2364            return False
2365        return not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111B
2366
2367    def ed25519_load_public_bytes(self, data):
2368        utils._check_bytes("data", data)
2369
2370        if len(data) != ed25519._ED25519_KEY_SIZE:
2371            raise ValueError("An Ed25519 public key is 32 bytes long")
2372
2373        evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
2374            self._lib.NID_ED25519, self._ffi.NULL, data, len(data)
2375        )
2376        self.openssl_assert(evp_pkey != self._ffi.NULL)
2377        evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
2378
2379        return _Ed25519PublicKey(self, evp_pkey)
2380
2381    def ed25519_load_private_bytes(self, data):
2382        if len(data) != ed25519._ED25519_KEY_SIZE:
2383            raise ValueError("An Ed25519 private key is 32 bytes long")
2384
2385        utils._check_byteslike("data", data)
2386        data_ptr = self._ffi.from_buffer(data)
2387        evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
2388            self._lib.NID_ED25519, self._ffi.NULL, data_ptr, len(data)
2389        )
2390        self.openssl_assert(evp_pkey != self._ffi.NULL)
2391        evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
2392
2393        return _Ed25519PrivateKey(self, evp_pkey)
2394
2395    def ed25519_generate_key(self):
2396        evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED25519)
2397        return _Ed25519PrivateKey(self, evp_pkey)
2398
2399    def ed448_supported(self):
2400        if self._fips_enabled:
2401            return False
2402        return not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111B
2403
2404    def ed448_load_public_bytes(self, data):
2405        utils._check_bytes("data", data)
2406        if len(data) != _ED448_KEY_SIZE:
2407            raise ValueError("An Ed448 public key is 57 bytes long")
2408
2409        evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
2410            self._lib.NID_ED448, self._ffi.NULL, data, len(data)
2411        )
2412        self.openssl_assert(evp_pkey != self._ffi.NULL)
2413        evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
2414
2415        return _Ed448PublicKey(self, evp_pkey)
2416
2417    def ed448_load_private_bytes(self, data):
2418        utils._check_byteslike("data", data)
2419        if len(data) != _ED448_KEY_SIZE:
2420            raise ValueError("An Ed448 private key is 57 bytes long")
2421
2422        data_ptr = self._ffi.from_buffer(data)
2423        evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
2424            self._lib.NID_ED448, self._ffi.NULL, data_ptr, len(data)
2425        )
2426        self.openssl_assert(evp_pkey != self._ffi.NULL)
2427        evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
2428
2429        return _Ed448PrivateKey(self, evp_pkey)
2430
2431    def ed448_generate_key(self):
2432        evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED448)
2433        return _Ed448PrivateKey(self, evp_pkey)
2434
2435    def derive_scrypt(self, key_material, salt, length, n, r, p):
2436        buf = self._ffi.new("unsigned char[]", length)
2437        key_material_ptr = self._ffi.from_buffer(key_material)
2438        res = self._lib.EVP_PBE_scrypt(
2439            key_material_ptr,
2440            len(key_material),
2441            salt,
2442            len(salt),
2443            n,
2444            r,
2445            p,
2446            scrypt._MEM_LIMIT,
2447            buf,
2448            length,
2449        )
2450        if res != 1:
2451            errors = self._consume_errors_with_text()
2452            # memory required formula explained here:
2453            # https://blog.filippo.io/the-scrypt-parameters/
2454            min_memory = 128 * n * r // (1024 ** 2)
2455            raise MemoryError(
2456                "Not enough memory to derive key. These parameters require"
2457                " {} MB of memory.".format(min_memory),
2458                errors,
2459            )
2460        return self._ffi.buffer(buf)[:]
2461
2462    def aead_cipher_supported(self, cipher):
2463        cipher_name = aead._aead_cipher_name(cipher)
2464        if self._fips_enabled and cipher_name not in self._fips_aead:
2465            return False
2466        return self._lib.EVP_get_cipherbyname(cipher_name) != self._ffi.NULL
2467
2468    @contextlib.contextmanager
2469    def _zeroed_bytearray(self, length):
2470        """
2471        This method creates a bytearray, which we copy data into (hopefully
2472        also from a mutable buffer that can be dynamically erased!), and then
2473        zero when we're done.
2474        """
2475        ba = bytearray(length)
2476        try:
2477            yield ba
2478        finally:
2479            self._zero_data(ba, length)
2480
2481    def _zero_data(self, data, length):
2482        # We clear things this way because at the moment we're not
2483        # sure of a better way that can guarantee it overwrites the
2484        # memory of a bytearray and doesn't just replace the underlying char *.
2485        for i in range(length):
2486            data[i] = 0
2487
2488    @contextlib.contextmanager
2489    def _zeroed_null_terminated_buf(self, data):
2490        """
2491        This method takes bytes, which can be a bytestring or a mutable
2492        buffer like a bytearray, and yields a null-terminated version of that
2493        data. This is required because PKCS12_parse doesn't take a length with
2494        its password char * and ffi.from_buffer doesn't provide null
2495        termination. So, to support zeroing the data via bytearray we
2496        need to build this ridiculous construct that copies the memory, but
2497        zeroes it after use.
2498        """
2499        if data is None:
2500            yield self._ffi.NULL
2501        else:
2502            data_len = len(data)
2503            buf = self._ffi.new("char[]", data_len + 1)
2504            self._ffi.memmove(buf, data, data_len)
2505            try:
2506                yield buf
2507            finally:
2508                # Cast to a uint8_t * so we can assign by integer
2509                self._zero_data(self._ffi.cast("uint8_t *", buf), data_len)
2510
2511    def load_key_and_certificates_from_pkcs12(self, data, password):
2512        if password is not None:
2513            utils._check_byteslike("password", password)
2514
2515        bio = self._bytes_to_bio(data)
2516        p12 = self._lib.d2i_PKCS12_bio(bio.bio, self._ffi.NULL)
2517        if p12 == self._ffi.NULL:
2518            self._consume_errors()
2519            raise ValueError("Could not deserialize PKCS12 data")
2520
2521        p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
2522        evp_pkey_ptr = self._ffi.new("EVP_PKEY **")
2523        x509_ptr = self._ffi.new("X509 **")
2524        sk_x509_ptr = self._ffi.new("Cryptography_STACK_OF_X509 **")
2525        with self._zeroed_null_terminated_buf(password) as password_buf:
2526            res = self._lib.PKCS12_parse(
2527                p12, password_buf, evp_pkey_ptr, x509_ptr, sk_x509_ptr
2528            )
2529
2530        if res == 0:
2531            self._consume_errors()
2532            raise ValueError("Invalid password or PKCS12 data")
2533
2534        cert = None
2535        key = None
2536        additional_certificates = []
2537
2538        if evp_pkey_ptr[0] != self._ffi.NULL:
2539            evp_pkey = self._ffi.gc(evp_pkey_ptr[0], self._lib.EVP_PKEY_free)
2540            key = self._evp_pkey_to_private_key(evp_pkey)
2541
2542        if x509_ptr[0] != self._ffi.NULL:
2543            x509 = self._ffi.gc(x509_ptr[0], self._lib.X509_free)
2544            cert = _Certificate(self, x509)
2545
2546        if sk_x509_ptr[0] != self._ffi.NULL:
2547            sk_x509 = self._ffi.gc(sk_x509_ptr[0], self._lib.sk_X509_free)
2548            num = self._lib.sk_X509_num(sk_x509_ptr[0])
2549            for i in range(num):
2550                x509 = self._lib.sk_X509_value(sk_x509, i)
2551                self.openssl_assert(x509 != self._ffi.NULL)
2552                x509 = self._ffi.gc(x509, self._lib.X509_free)
2553                additional_certificates.append(_Certificate(self, x509))
2554
2555        return (key, cert, additional_certificates)
2556
2557    def serialize_key_and_certificates_to_pkcs12(
2558        self, name, key, cert, cas, encryption_algorithm
2559    ):
2560        password = None
2561        if name is not None:
2562            utils._check_bytes("name", name)
2563
2564        if isinstance(encryption_algorithm, serialization.NoEncryption):
2565            nid_cert = -1
2566            nid_key = -1
2567            pkcs12_iter = 0
2568            mac_iter = 0
2569        elif isinstance(
2570            encryption_algorithm, serialization.BestAvailableEncryption
2571        ):
2572            # PKCS12 encryption is hopeless trash and can never be fixed.
2573            # This is the least terrible option.
2574            nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
2575            nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
2576            # At least we can set this higher than OpenSSL's default
2577            pkcs12_iter = 20000
2578            # mac_iter chosen for compatibility reasons, see:
2579            # https://www.openssl.org/docs/man1.1.1/man3/PKCS12_create.html
2580            # Did we mention how lousy PKCS12 encryption is?
2581            mac_iter = 1
2582            password = encryption_algorithm.password
2583        else:
2584            raise ValueError("Unsupported key encryption type")
2585
2586        if cas is None or len(cas) == 0:
2587            sk_x509 = self._ffi.NULL
2588        else:
2589            sk_x509 = self._lib.sk_X509_new_null()
2590            sk_x509 = self._ffi.gc(sk_x509, self._lib.sk_X509_free)
2591
2592            # reverse the list when building the stack so that they're encoded
2593            # in the order they were originally provided. it is a mystery
2594            for ca in reversed(cas):
2595                res = self._lib.sk_X509_push(sk_x509, ca._x509)
2596                backend.openssl_assert(res >= 1)
2597
2598        with self._zeroed_null_terminated_buf(password) as password_buf:
2599            with self._zeroed_null_terminated_buf(name) as name_buf:
2600                p12 = self._lib.PKCS12_create(
2601                    password_buf,
2602                    name_buf,
2603                    key._evp_pkey if key else self._ffi.NULL,
2604                    cert._x509 if cert else self._ffi.NULL,
2605                    sk_x509,
2606                    nid_key,
2607                    nid_cert,
2608                    pkcs12_iter,
2609                    mac_iter,
2610                    0,
2611                )
2612
2613        self.openssl_assert(p12 != self._ffi.NULL)
2614        p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
2615
2616        bio = self._create_mem_bio_gc()
2617        res = self._lib.i2d_PKCS12_bio(bio, p12)
2618        self.openssl_assert(res > 0)
2619        return self._read_mem_bio(bio)
2620
2621    def poly1305_supported(self):
2622        if self._fips_enabled:
2623            return False
2624        return self._lib.Cryptography_HAS_POLY1305 == 1
2625
2626    def create_poly1305_ctx(self, key):
2627        utils._check_byteslike("key", key)
2628        if len(key) != _POLY1305_KEY_SIZE:
2629            raise ValueError("A poly1305 key is 32 bytes long")
2630
2631        return _Poly1305Context(self, key)
2632
2633    def load_pem_pkcs7_certificates(self, data):
2634        utils._check_bytes("data", data)
2635        bio = self._bytes_to_bio(data)
2636        p7 = self._lib.PEM_read_bio_PKCS7(
2637            bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
2638        )
2639        if p7 == self._ffi.NULL:
2640            self._consume_errors()
2641            raise ValueError("Unable to parse PKCS7 data")
2642
2643        p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
2644        return self._load_pkcs7_certificates(p7)
2645
2646    def load_der_pkcs7_certificates(self, data):
2647        utils._check_bytes("data", data)
2648        bio = self._bytes_to_bio(data)
2649        p7 = self._lib.d2i_PKCS7_bio(bio.bio, self._ffi.NULL)
2650        if p7 == self._ffi.NULL:
2651            self._consume_errors()
2652            raise ValueError("Unable to parse PKCS7 data")
2653
2654        p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
2655        return self._load_pkcs7_certificates(p7)
2656
2657    def _load_pkcs7_certificates(self, p7):
2658        nid = self._lib.OBJ_obj2nid(p7.type)
2659        self.openssl_assert(nid != self._lib.NID_undef)
2660        if nid != self._lib.NID_pkcs7_signed:
2661            raise UnsupportedAlgorithm(
2662                "Only basic signed structures are currently supported. NID"
2663                " for this data was {}".format(nid),
2664                _Reasons.UNSUPPORTED_SERIALIZATION,
2665            )
2666
2667        sk_x509 = p7.d.sign.cert
2668        num = self._lib.sk_X509_num(sk_x509)
2669        certs = []
2670        for i in range(num):
2671            x509 = self._lib.sk_X509_value(sk_x509, i)
2672            self.openssl_assert(x509 != self._ffi.NULL)
2673            res = self._lib.X509_up_ref(x509)
2674            # When OpenSSL is less than 1.1.0 up_ref returns the current
2675            # refcount. On 1.1.0+ it returns 1 for success.
2676            self.openssl_assert(res >= 1)
2677            x509 = self._ffi.gc(x509, self._lib.X509_free)
2678            certs.append(_Certificate(self, x509))
2679
2680        return certs
2681
2682    def pkcs7_sign(self, builder, encoding, options):
2683        bio = self._bytes_to_bio(builder._data)
2684        init_flags = self._lib.PKCS7_PARTIAL
2685        final_flags = 0
2686
2687        if len(builder._additional_certs) == 0:
2688            certs = self._ffi.NULL
2689        else:
2690            certs = self._lib.sk_X509_new_null()
2691            certs = self._ffi.gc(certs, self._lib.sk_X509_free)
2692            for cert in builder._additional_certs:
2693                res = self._lib.sk_X509_push(certs, cert._x509)
2694                self.openssl_assert(res >= 1)
2695
2696        if pkcs7.PKCS7Options.DetachedSignature in options:
2697            # Don't embed the data in the PKCS7 structure
2698            init_flags |= self._lib.PKCS7_DETACHED
2699            final_flags |= self._lib.PKCS7_DETACHED
2700
2701        # This just inits a structure for us. However, there
2702        # are flags we need to set, joy.
2703        p7 = self._lib.PKCS7_sign(
2704            self._ffi.NULL,
2705            self._ffi.NULL,
2706            certs,
2707            self._ffi.NULL,
2708            init_flags,
2709        )
2710        self.openssl_assert(p7 != self._ffi.NULL)
2711        p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
2712        signer_flags = 0
2713        # These flags are configurable on a per-signature basis
2714        # but we've deliberately chosen to make the API only allow
2715        # setting it across all signatures for now.
2716        if pkcs7.PKCS7Options.NoCapabilities in options:
2717            signer_flags |= self._lib.PKCS7_NOSMIMECAP
2718        elif pkcs7.PKCS7Options.NoAttributes in options:
2719            signer_flags |= self._lib.PKCS7_NOATTR
2720
2721        if pkcs7.PKCS7Options.NoCerts in options:
2722            signer_flags |= self._lib.PKCS7_NOCERTS
2723
2724        for certificate, private_key, hash_algorithm in builder._signers:
2725            md = self._evp_md_non_null_from_algorithm(hash_algorithm)
2726            p7signerinfo = self._lib.PKCS7_sign_add_signer(
2727                p7, certificate._x509, private_key._evp_pkey, md, signer_flags
2728            )
2729            self.openssl_assert(p7signerinfo != self._ffi.NULL)
2730
2731        for option in options:
2732            # DetachedSignature, NoCapabilities, and NoAttributes are already
2733            # handled so we just need to check these last two options.
2734            if option is pkcs7.PKCS7Options.Text:
2735                final_flags |= self._lib.PKCS7_TEXT
2736            elif option is pkcs7.PKCS7Options.Binary:
2737                final_flags |= self._lib.PKCS7_BINARY
2738
2739        bio_out = self._create_mem_bio_gc()
2740        if encoding is serialization.Encoding.SMIME:
2741            # This finalizes the structure
2742            res = self._lib.SMIME_write_PKCS7(
2743                bio_out, p7, bio.bio, final_flags
2744            )
2745        elif encoding is serialization.Encoding.PEM:
2746            res = self._lib.PKCS7_final(p7, bio.bio, final_flags)
2747            self.openssl_assert(res == 1)
2748            res = self._lib.PEM_write_bio_PKCS7_stream(
2749                bio_out, p7, bio.bio, final_flags
2750            )
2751        else:
2752            assert encoding is serialization.Encoding.DER
2753            # We need to call finalize here becauase i2d_PKCS7_bio does not
2754            # finalize.
2755            res = self._lib.PKCS7_final(p7, bio.bio, final_flags)
2756            self.openssl_assert(res == 1)
2757            res = self._lib.i2d_PKCS7_bio(bio_out, p7)
2758        self.openssl_assert(res == 1)
2759        return self._read_mem_bio(bio_out)
2760
2761
2762class GetCipherByName(object):
2763    def __init__(self, fmt):
2764        self._fmt = fmt
2765
2766    def __call__(self, backend, cipher, mode):
2767        cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower()
2768        return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))
2769
2770
2771def _get_xts_cipher(backend, cipher, mode):
2772    cipher_name = "aes-{}-xts".format(cipher.key_size // 2)
2773    return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))
2774
2775
2776backend = Backend()
2777