• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import calendar
2import datetime
3
4from base64 import b16encode
5from functools import partial
6from operator import __eq__, __ne__, __lt__, __le__, __gt__, __ge__
7
8from six import (
9    integer_types as _integer_types,
10    text_type as _text_type,
11    PY2 as _PY2,
12)
13
14from cryptography import utils, x509
15from cryptography.hazmat.primitives.asymmetric import dsa, rsa
16
17from OpenSSL._util import (
18    ffi as _ffi,
19    lib as _lib,
20    exception_from_error_queue as _exception_from_error_queue,
21    byte_string as _byte_string,
22    native as _native,
23    path_string as _path_string,
24    UNSPECIFIED as _UNSPECIFIED,
25    text_to_bytes_and_warn as _text_to_bytes_and_warn,
26    make_assert as _make_assert,
27)
28
29__all__ = [
30    "FILETYPE_PEM",
31    "FILETYPE_ASN1",
32    "FILETYPE_TEXT",
33    "TYPE_RSA",
34    "TYPE_DSA",
35    "Error",
36    "PKey",
37    "get_elliptic_curves",
38    "get_elliptic_curve",
39    "X509Name",
40    "X509Extension",
41    "X509Req",
42    "X509",
43    "X509StoreFlags",
44    "X509Store",
45    "X509StoreContextError",
46    "X509StoreContext",
47    "load_certificate",
48    "dump_certificate",
49    "dump_publickey",
50    "dump_privatekey",
51    "Revoked",
52    "CRL",
53    "PKCS7",
54    "PKCS12",
55    "NetscapeSPKI",
56    "load_publickey",
57    "load_privatekey",
58    "dump_certificate_request",
59    "load_certificate_request",
60    "sign",
61    "verify",
62    "dump_crl",
63    "load_crl",
64    "load_pkcs7_data",
65    "load_pkcs12",
66]
67
68FILETYPE_PEM = _lib.SSL_FILETYPE_PEM
69FILETYPE_ASN1 = _lib.SSL_FILETYPE_ASN1
70
71# TODO This was an API mistake.  OpenSSL has no such constant.
72FILETYPE_TEXT = 2 ** 16 - 1
73
74TYPE_RSA = _lib.EVP_PKEY_RSA
75TYPE_DSA = _lib.EVP_PKEY_DSA
76TYPE_DH = _lib.EVP_PKEY_DH
77TYPE_EC = _lib.EVP_PKEY_EC
78
79
80class Error(Exception):
81    """
82    An error occurred in an `OpenSSL.crypto` API.
83    """
84
85
86_raise_current_error = partial(_exception_from_error_queue, Error)
87_openssl_assert = _make_assert(Error)
88
89
90def _get_backend():
91    """
92    Importing the backend from cryptography has the side effect of activating
93    the osrandom engine. This mutates the global state of OpenSSL in the
94    process and causes issues for various programs that use subinterpreters or
95    embed Python. By putting the import in this function we can avoid
96    triggering this side effect unless _get_backend is called.
97    """
98    from cryptography.hazmat.backends.openssl.backend import backend
99
100    return backend
101
102
103def _untested_error(where):
104    """
105    An OpenSSL API failed somehow.  Additionally, the failure which was
106    encountered isn't one that's exercised by the test suite so future behavior
107    of pyOpenSSL is now somewhat less predictable.
108    """
109    raise RuntimeError("Unknown %s failure" % (where,))
110
111
112def _new_mem_buf(buffer=None):
113    """
114    Allocate a new OpenSSL memory BIO.
115
116    Arrange for the garbage collector to clean it up automatically.
117
118    :param buffer: None or some bytes to use to put into the BIO so that they
119        can be read out.
120    """
121    if buffer is None:
122        bio = _lib.BIO_new(_lib.BIO_s_mem())
123        free = _lib.BIO_free
124    else:
125        data = _ffi.new("char[]", buffer)
126        bio = _lib.BIO_new_mem_buf(data, len(buffer))
127
128        # Keep the memory alive as long as the bio is alive!
129        def free(bio, ref=data):
130            return _lib.BIO_free(bio)
131
132    _openssl_assert(bio != _ffi.NULL)
133
134    bio = _ffi.gc(bio, free)
135    return bio
136
137
138def _bio_to_string(bio):
139    """
140    Copy the contents of an OpenSSL BIO object into a Python byte string.
141    """
142    result_buffer = _ffi.new("char**")
143    buffer_length = _lib.BIO_get_mem_data(bio, result_buffer)
144    return _ffi.buffer(result_buffer[0], buffer_length)[:]
145
146
147def _set_asn1_time(boundary, when):
148    """
149    The the time value of an ASN1 time object.
150
151    @param boundary: An ASN1_TIME pointer (or an object safely
152        castable to that type) which will have its value set.
153    @param when: A string representation of the desired time value.
154
155    @raise TypeError: If C{when} is not a L{bytes} string.
156    @raise ValueError: If C{when} does not represent a time in the required
157        format.
158    @raise RuntimeError: If the time value cannot be set for some other
159        (unspecified) reason.
160    """
161    if not isinstance(when, bytes):
162        raise TypeError("when must be a byte string")
163
164    set_result = _lib.ASN1_TIME_set_string(boundary, when)
165    if set_result == 0:
166        raise ValueError("Invalid string")
167
168
169def _get_asn1_time(timestamp):
170    """
171    Retrieve the time value of an ASN1 time object.
172
173    @param timestamp: An ASN1_GENERALIZEDTIME* (or an object safely castable to
174        that type) from which the time value will be retrieved.
175
176    @return: The time value from C{timestamp} as a L{bytes} string in a certain
177        format.  Or C{None} if the object contains no time value.
178    """
179    string_timestamp = _ffi.cast("ASN1_STRING*", timestamp)
180    if _lib.ASN1_STRING_length(string_timestamp) == 0:
181        return None
182    elif (
183        _lib.ASN1_STRING_type(string_timestamp) == _lib.V_ASN1_GENERALIZEDTIME
184    ):
185        return _ffi.string(_lib.ASN1_STRING_data(string_timestamp))
186    else:
187        generalized_timestamp = _ffi.new("ASN1_GENERALIZEDTIME**")
188        _lib.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp)
189        if generalized_timestamp[0] == _ffi.NULL:
190            # This may happen:
191            #   - if timestamp was not an ASN1_TIME
192            #   - if allocating memory for the ASN1_GENERALIZEDTIME failed
193            #   - if a copy of the time data from timestamp cannot be made for
194            #     the newly allocated ASN1_GENERALIZEDTIME
195            #
196            # These are difficult to test.  cffi enforces the ASN1_TIME type.
197            # Memory allocation failures are a pain to trigger
198            # deterministically.
199            _untested_error("ASN1_TIME_to_generalizedtime")
200        else:
201            string_timestamp = _ffi.cast(
202                "ASN1_STRING*", generalized_timestamp[0]
203            )
204            string_data = _lib.ASN1_STRING_data(string_timestamp)
205            string_result = _ffi.string(string_data)
206            _lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0])
207            return string_result
208
209
210class _X509NameInvalidator(object):
211    def __init__(self):
212        self._names = []
213
214    def add(self, name):
215        self._names.append(name)
216
217    def clear(self):
218        for name in self._names:
219            # Breaks the object, but also prevents UAF!
220            del name._name
221
222
223class PKey(object):
224    """
225    A class representing an DSA or RSA public key or key pair.
226    """
227
228    _only_public = False
229    _initialized = True
230
231    def __init__(self):
232        pkey = _lib.EVP_PKEY_new()
233        self._pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free)
234        self._initialized = False
235
236    def to_cryptography_key(self):
237        """
238        Export as a ``cryptography`` key.
239
240        :rtype: One of ``cryptography``'s `key interfaces`_.
241
242        .. _key interfaces: https://cryptography.io/en/latest/hazmat/\
243            primitives/asymmetric/rsa/#key-interfaces
244
245        .. versionadded:: 16.1.0
246        """
247        backend = _get_backend()
248        if self._only_public:
249            return backend._evp_pkey_to_public_key(self._pkey)
250        else:
251            return backend._evp_pkey_to_private_key(self._pkey)
252
253    @classmethod
254    def from_cryptography_key(cls, crypto_key):
255        """
256        Construct based on a ``cryptography`` *crypto_key*.
257
258        :param crypto_key: A ``cryptography`` key.
259        :type crypto_key: One of ``cryptography``'s `key interfaces`_.
260
261        :rtype: PKey
262
263        .. versionadded:: 16.1.0
264        """
265        pkey = cls()
266        if not isinstance(
267            crypto_key,
268            (
269                rsa.RSAPublicKey,
270                rsa.RSAPrivateKey,
271                dsa.DSAPublicKey,
272                dsa.DSAPrivateKey,
273            ),
274        ):
275            raise TypeError("Unsupported key type")
276
277        pkey._pkey = crypto_key._evp_pkey
278        if isinstance(crypto_key, (rsa.RSAPublicKey, dsa.DSAPublicKey)):
279            pkey._only_public = True
280        pkey._initialized = True
281        return pkey
282
283    def generate_key(self, type, bits):
284        """
285        Generate a key pair of the given type, with the given number of bits.
286
287        This generates a key "into" the this object.
288
289        :param type: The key type.
290        :type type: :py:data:`TYPE_RSA` or :py:data:`TYPE_DSA`
291        :param bits: The number of bits.
292        :type bits: :py:data:`int` ``>= 0``
293        :raises TypeError: If :py:data:`type` or :py:data:`bits` isn't
294            of the appropriate type.
295        :raises ValueError: If the number of bits isn't an integer of
296            the appropriate size.
297        :return: ``None``
298        """
299        if not isinstance(type, int):
300            raise TypeError("type must be an integer")
301
302        if not isinstance(bits, int):
303            raise TypeError("bits must be an integer")
304
305        if type == TYPE_RSA:
306            if bits <= 0:
307                raise ValueError("Invalid number of bits")
308
309            # TODO Check error return
310            exponent = _lib.BN_new()
311            exponent = _ffi.gc(exponent, _lib.BN_free)
312            _lib.BN_set_word(exponent, _lib.RSA_F4)
313
314            rsa = _lib.RSA_new()
315
316            result = _lib.RSA_generate_key_ex(rsa, bits, exponent, _ffi.NULL)
317            _openssl_assert(result == 1)
318
319            result = _lib.EVP_PKEY_assign_RSA(self._pkey, rsa)
320            _openssl_assert(result == 1)
321
322        elif type == TYPE_DSA:
323            dsa = _lib.DSA_new()
324            _openssl_assert(dsa != _ffi.NULL)
325
326            dsa = _ffi.gc(dsa, _lib.DSA_free)
327            res = _lib.DSA_generate_parameters_ex(
328                dsa, bits, _ffi.NULL, 0, _ffi.NULL, _ffi.NULL, _ffi.NULL
329            )
330            _openssl_assert(res == 1)
331
332            _openssl_assert(_lib.DSA_generate_key(dsa) == 1)
333            _openssl_assert(_lib.EVP_PKEY_set1_DSA(self._pkey, dsa) == 1)
334        else:
335            raise Error("No such key type")
336
337        self._initialized = True
338
339    def check(self):
340        """
341        Check the consistency of an RSA private key.
342
343        This is the Python equivalent of OpenSSL's ``RSA_check_key``.
344
345        :return: ``True`` if key is consistent.
346
347        :raise OpenSSL.crypto.Error: if the key is inconsistent.
348
349        :raise TypeError: if the key is of a type which cannot be checked.
350            Only RSA keys can currently be checked.
351        """
352        if self._only_public:
353            raise TypeError("public key only")
354
355        if _lib.EVP_PKEY_type(self.type()) != _lib.EVP_PKEY_RSA:
356            raise TypeError("key type unsupported")
357
358        rsa = _lib.EVP_PKEY_get1_RSA(self._pkey)
359        rsa = _ffi.gc(rsa, _lib.RSA_free)
360        result = _lib.RSA_check_key(rsa)
361        if result == 1:
362            return True
363        _raise_current_error()
364
365    def type(self):
366        """
367        Returns the type of the key
368
369        :return: The type of the key.
370        """
371        return _lib.EVP_PKEY_id(self._pkey)
372
373    def bits(self):
374        """
375        Returns the number of bits of the key
376
377        :return: The number of bits of the key.
378        """
379        return _lib.EVP_PKEY_bits(self._pkey)
380
381
382class _EllipticCurve(object):
383    """
384    A representation of a supported elliptic curve.
385
386    @cvar _curves: :py:obj:`None` until an attempt is made to load the curves.
387        Thereafter, a :py:type:`set` containing :py:type:`_EllipticCurve`
388        instances each of which represents one curve supported by the system.
389    @type _curves: :py:type:`NoneType` or :py:type:`set`
390    """
391
392    _curves = None
393
394    if not _PY2:
395        # This only necessary on Python 3.  Moreover, it is broken on Python 2.
396        def __ne__(self, other):
397            """
398            Implement cooperation with the right-hand side argument of ``!=``.
399
400            Python 3 seems to have dropped this cooperation in this very narrow
401            circumstance.
402            """
403            if isinstance(other, _EllipticCurve):
404                return super(_EllipticCurve, self).__ne__(other)
405            return NotImplemented
406
407    @classmethod
408    def _load_elliptic_curves(cls, lib):
409        """
410        Get the curves supported by OpenSSL.
411
412        :param lib: The OpenSSL library binding object.
413
414        :return: A :py:type:`set` of ``cls`` instances giving the names of the
415            elliptic curves the underlying library supports.
416        """
417        num_curves = lib.EC_get_builtin_curves(_ffi.NULL, 0)
418        builtin_curves = _ffi.new("EC_builtin_curve[]", num_curves)
419        # The return value on this call should be num_curves again.  We
420        # could check it to make sure but if it *isn't* then.. what could
421        # we do? Abort the whole process, I suppose...?  -exarkun
422        lib.EC_get_builtin_curves(builtin_curves, num_curves)
423        return set(cls.from_nid(lib, c.nid) for c in builtin_curves)
424
425    @classmethod
426    def _get_elliptic_curves(cls, lib):
427        """
428        Get, cache, and return the curves supported by OpenSSL.
429
430        :param lib: The OpenSSL library binding object.
431
432        :return: A :py:type:`set` of ``cls`` instances giving the names of the
433            elliptic curves the underlying library supports.
434        """
435        if cls._curves is None:
436            cls._curves = cls._load_elliptic_curves(lib)
437        return cls._curves
438
439    @classmethod
440    def from_nid(cls, lib, nid):
441        """
442        Instantiate a new :py:class:`_EllipticCurve` associated with the given
443        OpenSSL NID.
444
445        :param lib: The OpenSSL library binding object.
446
447        :param nid: The OpenSSL NID the resulting curve object will represent.
448            This must be a curve NID (and not, for example, a hash NID) or
449            subsequent operations will fail in unpredictable ways.
450        :type nid: :py:class:`int`
451
452        :return: The curve object.
453        """
454        return cls(lib, nid, _ffi.string(lib.OBJ_nid2sn(nid)).decode("ascii"))
455
456    def __init__(self, lib, nid, name):
457        """
458        :param _lib: The :py:mod:`cryptography` binding instance used to
459            interface with OpenSSL.
460
461        :param _nid: The OpenSSL NID identifying the curve this object
462            represents.
463        :type _nid: :py:class:`int`
464
465        :param name: The OpenSSL short name identifying the curve this object
466            represents.
467        :type name: :py:class:`unicode`
468        """
469        self._lib = lib
470        self._nid = nid
471        self.name = name
472
473    def __repr__(self):
474        return "<Curve %r>" % (self.name,)
475
476    def _to_EC_KEY(self):
477        """
478        Create a new OpenSSL EC_KEY structure initialized to use this curve.
479
480        The structure is automatically garbage collected when the Python object
481        is garbage collected.
482        """
483        key = self._lib.EC_KEY_new_by_curve_name(self._nid)
484        return _ffi.gc(key, _lib.EC_KEY_free)
485
486
487def get_elliptic_curves():
488    """
489    Return a set of objects representing the elliptic curves supported in the
490    OpenSSL build in use.
491
492    The curve objects have a :py:class:`unicode` ``name`` attribute by which
493    they identify themselves.
494
495    The curve objects are useful as values for the argument accepted by
496    :py:meth:`Context.set_tmp_ecdh` to specify which elliptical curve should be
497    used for ECDHE key exchange.
498    """
499    return _EllipticCurve._get_elliptic_curves(_lib)
500
501
502def get_elliptic_curve(name):
503    """
504    Return a single curve object selected by name.
505
506    See :py:func:`get_elliptic_curves` for information about curve objects.
507
508    :param name: The OpenSSL short name identifying the curve object to
509        retrieve.
510    :type name: :py:class:`unicode`
511
512    If the named curve is not supported then :py:class:`ValueError` is raised.
513    """
514    for curve in get_elliptic_curves():
515        if curve.name == name:
516            return curve
517    raise ValueError("unknown curve name", name)
518
519
520class X509Name(object):
521    """
522    An X.509 Distinguished Name.
523
524    :ivar countryName: The country of the entity.
525    :ivar C: Alias for  :py:attr:`countryName`.
526
527    :ivar stateOrProvinceName: The state or province of the entity.
528    :ivar ST: Alias for :py:attr:`stateOrProvinceName`.
529
530    :ivar localityName: The locality of the entity.
531    :ivar L: Alias for :py:attr:`localityName`.
532
533    :ivar organizationName: The organization name of the entity.
534    :ivar O: Alias for :py:attr:`organizationName`.
535
536    :ivar organizationalUnitName: The organizational unit of the entity.
537    :ivar OU: Alias for :py:attr:`organizationalUnitName`
538
539    :ivar commonName: The common name of the entity.
540    :ivar CN: Alias for :py:attr:`commonName`.
541
542    :ivar emailAddress: The e-mail address of the entity.
543    """
544
545    def __init__(self, name):
546        """
547        Create a new X509Name, copying the given X509Name instance.
548
549        :param name: The name to copy.
550        :type name: :py:class:`X509Name`
551        """
552        name = _lib.X509_NAME_dup(name._name)
553        self._name = _ffi.gc(name, _lib.X509_NAME_free)
554
555    def __setattr__(self, name, value):
556        if name.startswith("_"):
557            return super(X509Name, self).__setattr__(name, value)
558
559        # Note: we really do not want str subclasses here, so we do not use
560        # isinstance.
561        if type(name) is not str:
562            raise TypeError(
563                "attribute name must be string, not '%.200s'"
564                % (type(value).__name__,)
565            )
566
567        nid = _lib.OBJ_txt2nid(_byte_string(name))
568        if nid == _lib.NID_undef:
569            try:
570                _raise_current_error()
571            except Error:
572                pass
573            raise AttributeError("No such attribute")
574
575        # If there's an old entry for this NID, remove it
576        for i in range(_lib.X509_NAME_entry_count(self._name)):
577            ent = _lib.X509_NAME_get_entry(self._name, i)
578            ent_obj = _lib.X509_NAME_ENTRY_get_object(ent)
579            ent_nid = _lib.OBJ_obj2nid(ent_obj)
580            if nid == ent_nid:
581                ent = _lib.X509_NAME_delete_entry(self._name, i)
582                _lib.X509_NAME_ENTRY_free(ent)
583                break
584
585        if isinstance(value, _text_type):
586            value = value.encode("utf-8")
587
588        add_result = _lib.X509_NAME_add_entry_by_NID(
589            self._name, nid, _lib.MBSTRING_UTF8, value, -1, -1, 0
590        )
591        if not add_result:
592            _raise_current_error()
593
594    def __getattr__(self, name):
595        """
596        Find attribute. An X509Name object has the following attributes:
597        countryName (alias C), stateOrProvince (alias ST), locality (alias L),
598        organization (alias O), organizationalUnit (alias OU), commonName
599        (alias CN) and more...
600        """
601        nid = _lib.OBJ_txt2nid(_byte_string(name))
602        if nid == _lib.NID_undef:
603            # This is a bit weird.  OBJ_txt2nid indicated failure, but it seems
604            # a lower level function, a2d_ASN1_OBJECT, also feels the need to
605            # push something onto the error queue.  If we don't clean that up
606            # now, someone else will bump into it later and be quite confused.
607            # See lp#314814.
608            try:
609                _raise_current_error()
610            except Error:
611                pass
612            return super(X509Name, self).__getattr__(name)
613
614        entry_index = _lib.X509_NAME_get_index_by_NID(self._name, nid, -1)
615        if entry_index == -1:
616            return None
617
618        entry = _lib.X509_NAME_get_entry(self._name, entry_index)
619        data = _lib.X509_NAME_ENTRY_get_data(entry)
620
621        result_buffer = _ffi.new("unsigned char**")
622        data_length = _lib.ASN1_STRING_to_UTF8(result_buffer, data)
623        _openssl_assert(data_length >= 0)
624
625        try:
626            result = _ffi.buffer(result_buffer[0], data_length)[:].decode(
627                "utf-8"
628            )
629        finally:
630            # XXX untested
631            _lib.OPENSSL_free(result_buffer[0])
632        return result
633
634    def _cmp(op):
635        def f(self, other):
636            if not isinstance(other, X509Name):
637                return NotImplemented
638            result = _lib.X509_NAME_cmp(self._name, other._name)
639            return op(result, 0)
640
641        return f
642
643    __eq__ = _cmp(__eq__)
644    __ne__ = _cmp(__ne__)
645
646    __lt__ = _cmp(__lt__)
647    __le__ = _cmp(__le__)
648
649    __gt__ = _cmp(__gt__)
650    __ge__ = _cmp(__ge__)
651
652    def __repr__(self):
653        """
654        String representation of an X509Name
655        """
656        result_buffer = _ffi.new("char[]", 512)
657        format_result = _lib.X509_NAME_oneline(
658            self._name, result_buffer, len(result_buffer)
659        )
660        _openssl_assert(format_result != _ffi.NULL)
661
662        return "<X509Name object '%s'>" % (
663            _native(_ffi.string(result_buffer)),
664        )
665
666    def hash(self):
667        """
668        Return an integer representation of the first four bytes of the
669        MD5 digest of the DER representation of the name.
670
671        This is the Python equivalent of OpenSSL's ``X509_NAME_hash``.
672
673        :return: The (integer) hash of this name.
674        :rtype: :py:class:`int`
675        """
676        return _lib.X509_NAME_hash(self._name)
677
678    def der(self):
679        """
680        Return the DER encoding of this name.
681
682        :return: The DER encoded form of this name.
683        :rtype: :py:class:`bytes`
684        """
685        result_buffer = _ffi.new("unsigned char**")
686        encode_result = _lib.i2d_X509_NAME(self._name, result_buffer)
687        _openssl_assert(encode_result >= 0)
688
689        string_result = _ffi.buffer(result_buffer[0], encode_result)[:]
690        _lib.OPENSSL_free(result_buffer[0])
691        return string_result
692
693    def get_components(self):
694        """
695        Returns the components of this name, as a sequence of 2-tuples.
696
697        :return: The components of this name.
698        :rtype: :py:class:`list` of ``name, value`` tuples.
699        """
700        result = []
701        for i in range(_lib.X509_NAME_entry_count(self._name)):
702            ent = _lib.X509_NAME_get_entry(self._name, i)
703
704            fname = _lib.X509_NAME_ENTRY_get_object(ent)
705            fval = _lib.X509_NAME_ENTRY_get_data(ent)
706
707            nid = _lib.OBJ_obj2nid(fname)
708            name = _lib.OBJ_nid2sn(nid)
709
710            # ffi.string does not handle strings containing NULL bytes
711            # (which may have been generated by old, broken software)
712            value = _ffi.buffer(
713                _lib.ASN1_STRING_data(fval), _lib.ASN1_STRING_length(fval)
714            )[:]
715            result.append((_ffi.string(name), value))
716
717        return result
718
719
720class X509Extension(object):
721    """
722    An X.509 v3 certificate extension.
723    """
724
725    def __init__(self, type_name, critical, value, subject=None, issuer=None):
726        """
727        Initializes an X509 extension.
728
729        :param type_name: The name of the type of extension_ to create.
730        :type type_name: :py:data:`bytes`
731
732        :param bool critical: A flag indicating whether this is a critical
733            extension.
734
735        :param value: The value of the extension.
736        :type value: :py:data:`bytes`
737
738        :param subject: Optional X509 certificate to use as subject.
739        :type subject: :py:class:`X509`
740
741        :param issuer: Optional X509 certificate to use as issuer.
742        :type issuer: :py:class:`X509`
743
744        .. _extension: https://www.openssl.org/docs/manmaster/man5/
745            x509v3_config.html#STANDARD-EXTENSIONS
746        """
747        ctx = _ffi.new("X509V3_CTX*")
748
749        # A context is necessary for any extension which uses the r2i
750        # conversion method.  That is, X509V3_EXT_nconf may segfault if passed
751        # a NULL ctx. Start off by initializing most of the fields to NULL.
752        _lib.X509V3_set_ctx(ctx, _ffi.NULL, _ffi.NULL, _ffi.NULL, _ffi.NULL, 0)
753
754        # We have no configuration database - but perhaps we should (some
755        # extensions may require it).
756        _lib.X509V3_set_ctx_nodb(ctx)
757
758        # Initialize the subject and issuer, if appropriate.  ctx is a local,
759        # and as far as I can tell none of the X509V3_* APIs invoked here steal
760        # any references, so no need to mess with reference counts or
761        # duplicates.
762        if issuer is not None:
763            if not isinstance(issuer, X509):
764                raise TypeError("issuer must be an X509 instance")
765            ctx.issuer_cert = issuer._x509
766        if subject is not None:
767            if not isinstance(subject, X509):
768                raise TypeError("subject must be an X509 instance")
769            ctx.subject_cert = subject._x509
770
771        if critical:
772            # There are other OpenSSL APIs which would let us pass in critical
773            # separately, but they're harder to use, and since value is already
774            # a pile of crappy junk smuggling a ton of utterly important
775            # structured data, what's the point of trying to avoid nasty stuff
776            # with strings? (However, X509V3_EXT_i2d in particular seems like
777            # it would be a better API to invoke.  I do not know where to get
778            # the ext_struc it desires for its last parameter, though.)
779            value = b"critical," + value
780
781        extension = _lib.X509V3_EXT_nconf(_ffi.NULL, ctx, type_name, value)
782        if extension == _ffi.NULL:
783            _raise_current_error()
784        self._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free)
785
786    @property
787    def _nid(self):
788        return _lib.OBJ_obj2nid(
789            _lib.X509_EXTENSION_get_object(self._extension)
790        )
791
792    _prefixes = {
793        _lib.GEN_EMAIL: "email",
794        _lib.GEN_DNS: "DNS",
795        _lib.GEN_URI: "URI",
796    }
797
798    def _subjectAltNameString(self):
799        names = _ffi.cast(
800            "GENERAL_NAMES*", _lib.X509V3_EXT_d2i(self._extension)
801        )
802
803        names = _ffi.gc(names, _lib.GENERAL_NAMES_free)
804        parts = []
805        for i in range(_lib.sk_GENERAL_NAME_num(names)):
806            name = _lib.sk_GENERAL_NAME_value(names, i)
807            try:
808                label = self._prefixes[name.type]
809            except KeyError:
810                bio = _new_mem_buf()
811                _lib.GENERAL_NAME_print(bio, name)
812                parts.append(_native(_bio_to_string(bio)))
813            else:
814                value = _native(
815                    _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:]
816                )
817                parts.append(label + ":" + value)
818        return ", ".join(parts)
819
820    def __str__(self):
821        """
822        :return: a nice text representation of the extension
823        """
824        if _lib.NID_subject_alt_name == self._nid:
825            return self._subjectAltNameString()
826
827        bio = _new_mem_buf()
828        print_result = _lib.X509V3_EXT_print(bio, self._extension, 0, 0)
829        _openssl_assert(print_result != 0)
830
831        return _native(_bio_to_string(bio))
832
833    def get_critical(self):
834        """
835        Returns the critical field of this X.509 extension.
836
837        :return: The critical field.
838        """
839        return _lib.X509_EXTENSION_get_critical(self._extension)
840
841    def get_short_name(self):
842        """
843        Returns the short type name of this X.509 extension.
844
845        The result is a byte string such as :py:const:`b"basicConstraints"`.
846
847        :return: The short type name.
848        :rtype: :py:data:`bytes`
849
850        .. versionadded:: 0.12
851        """
852        obj = _lib.X509_EXTENSION_get_object(self._extension)
853        nid = _lib.OBJ_obj2nid(obj)
854        return _ffi.string(_lib.OBJ_nid2sn(nid))
855
856    def get_data(self):
857        """
858        Returns the data of the X509 extension, encoded as ASN.1.
859
860        :return: The ASN.1 encoded data of this X509 extension.
861        :rtype: :py:data:`bytes`
862
863        .. versionadded:: 0.12
864        """
865        octet_result = _lib.X509_EXTENSION_get_data(self._extension)
866        string_result = _ffi.cast("ASN1_STRING*", octet_result)
867        char_result = _lib.ASN1_STRING_data(string_result)
868        result_length = _lib.ASN1_STRING_length(string_result)
869        return _ffi.buffer(char_result, result_length)[:]
870
871
872class X509Req(object):
873    """
874    An X.509 certificate signing requests.
875    """
876
877    def __init__(self):
878        req = _lib.X509_REQ_new()
879        self._req = _ffi.gc(req, _lib.X509_REQ_free)
880        # Default to version 0.
881        self.set_version(0)
882
883    def to_cryptography(self):
884        """
885        Export as a ``cryptography`` certificate signing request.
886
887        :rtype: ``cryptography.x509.CertificateSigningRequest``
888
889        .. versionadded:: 17.1.0
890        """
891        from cryptography.hazmat.backends.openssl.x509 import (
892            _CertificateSigningRequest,
893        )
894
895        backend = _get_backend()
896        return _CertificateSigningRequest(backend, self._req)
897
898    @classmethod
899    def from_cryptography(cls, crypto_req):
900        """
901        Construct based on a ``cryptography`` *crypto_req*.
902
903        :param crypto_req: A ``cryptography`` X.509 certificate signing request
904        :type crypto_req: ``cryptography.x509.CertificateSigningRequest``
905
906        :rtype: X509Req
907
908        .. versionadded:: 17.1.0
909        """
910        if not isinstance(crypto_req, x509.CertificateSigningRequest):
911            raise TypeError("Must be a certificate signing request")
912
913        req = cls()
914        req._req = crypto_req._x509_req
915        return req
916
917    def set_pubkey(self, pkey):
918        """
919        Set the public key of the certificate signing request.
920
921        :param pkey: The public key to use.
922        :type pkey: :py:class:`PKey`
923
924        :return: ``None``
925        """
926        set_result = _lib.X509_REQ_set_pubkey(self._req, pkey._pkey)
927        _openssl_assert(set_result == 1)
928
929    def get_pubkey(self):
930        """
931        Get the public key of the certificate signing request.
932
933        :return: The public key.
934        :rtype: :py:class:`PKey`
935        """
936        pkey = PKey.__new__(PKey)
937        pkey._pkey = _lib.X509_REQ_get_pubkey(self._req)
938        _openssl_assert(pkey._pkey != _ffi.NULL)
939        pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free)
940        pkey._only_public = True
941        return pkey
942
943    def set_version(self, version):
944        """
945        Set the version subfield (RFC 2459, section 4.1.2.1) of the certificate
946        request.
947
948        :param int version: The version number.
949        :return: ``None``
950        """
951        set_result = _lib.X509_REQ_set_version(self._req, version)
952        _openssl_assert(set_result == 1)
953
954    def get_version(self):
955        """
956        Get the version subfield (RFC 2459, section 4.1.2.1) of the certificate
957        request.
958
959        :return: The value of the version subfield.
960        :rtype: :py:class:`int`
961        """
962        return _lib.X509_REQ_get_version(self._req)
963
964    def get_subject(self):
965        """
966        Return the subject of this certificate signing request.
967
968        This creates a new :class:`X509Name` that wraps the underlying subject
969        name field on the certificate signing request. Modifying it will modify
970        the underlying signing request, and will have the effect of modifying
971        any other :class:`X509Name` that refers to this subject.
972
973        :return: The subject of this certificate signing request.
974        :rtype: :class:`X509Name`
975        """
976        name = X509Name.__new__(X509Name)
977        name._name = _lib.X509_REQ_get_subject_name(self._req)
978        _openssl_assert(name._name != _ffi.NULL)
979
980        # The name is owned by the X509Req structure.  As long as the X509Name
981        # Python object is alive, keep the X509Req Python object alive.
982        name._owner = self
983
984        return name
985
986    def add_extensions(self, extensions):
987        """
988        Add extensions to the certificate signing request.
989
990        :param extensions: The X.509 extensions to add.
991        :type extensions: iterable of :py:class:`X509Extension`
992        :return: ``None``
993        """
994        stack = _lib.sk_X509_EXTENSION_new_null()
995        _openssl_assert(stack != _ffi.NULL)
996
997        stack = _ffi.gc(stack, _lib.sk_X509_EXTENSION_free)
998
999        for ext in extensions:
1000            if not isinstance(ext, X509Extension):
1001                raise ValueError("One of the elements is not an X509Extension")
1002
1003            # TODO push can fail (here and elsewhere)
1004            _lib.sk_X509_EXTENSION_push(stack, ext._extension)
1005
1006        add_result = _lib.X509_REQ_add_extensions(self._req, stack)
1007        _openssl_assert(add_result == 1)
1008
1009    def get_extensions(self):
1010        """
1011        Get X.509 extensions in the certificate signing request.
1012
1013        :return: The X.509 extensions in this request.
1014        :rtype: :py:class:`list` of :py:class:`X509Extension` objects.
1015
1016        .. versionadded:: 0.15
1017        """
1018        exts = []
1019        native_exts_obj = _lib.X509_REQ_get_extensions(self._req)
1020        native_exts_obj = _ffi.gc(
1021            native_exts_obj,
1022            lambda x: _lib.sk_X509_EXTENSION_pop_free(
1023                x,
1024                _ffi.addressof(_lib._original_lib, "X509_EXTENSION_free"),
1025            ),
1026        )
1027
1028        for i in range(_lib.sk_X509_EXTENSION_num(native_exts_obj)):
1029            ext = X509Extension.__new__(X509Extension)
1030            extension = _lib.X509_EXTENSION_dup(
1031                _lib.sk_X509_EXTENSION_value(native_exts_obj, i)
1032            )
1033            ext._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free)
1034            exts.append(ext)
1035        return exts
1036
1037    def sign(self, pkey, digest):
1038        """
1039        Sign the certificate signing request with this key and digest type.
1040
1041        :param pkey: The key pair to sign with.
1042        :type pkey: :py:class:`PKey`
1043        :param digest: The name of the message digest to use for the signature,
1044            e.g. :py:data:`b"sha256"`.
1045        :type digest: :py:class:`bytes`
1046        :return: ``None``
1047        """
1048        if pkey._only_public:
1049            raise ValueError("Key has only public part")
1050
1051        if not pkey._initialized:
1052            raise ValueError("Key is uninitialized")
1053
1054        digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
1055        if digest_obj == _ffi.NULL:
1056            raise ValueError("No such digest method")
1057
1058        sign_result = _lib.X509_REQ_sign(self._req, pkey._pkey, digest_obj)
1059        _openssl_assert(sign_result > 0)
1060
1061    def verify(self, pkey):
1062        """
1063        Verifies the signature on this certificate signing request.
1064
1065        :param PKey key: A public key.
1066
1067        :return: ``True`` if the signature is correct.
1068        :rtype: bool
1069
1070        :raises OpenSSL.crypto.Error: If the signature is invalid or there is a
1071            problem verifying the signature.
1072        """
1073        if not isinstance(pkey, PKey):
1074            raise TypeError("pkey must be a PKey instance")
1075
1076        result = _lib.X509_REQ_verify(self._req, pkey._pkey)
1077        if result <= 0:
1078            _raise_current_error()
1079
1080        return result
1081
1082
1083class X509(object):
1084    """
1085    An X.509 certificate.
1086    """
1087
1088    def __init__(self):
1089        x509 = _lib.X509_new()
1090        _openssl_assert(x509 != _ffi.NULL)
1091        self._x509 = _ffi.gc(x509, _lib.X509_free)
1092
1093        self._issuer_invalidator = _X509NameInvalidator()
1094        self._subject_invalidator = _X509NameInvalidator()
1095
1096    @classmethod
1097    def _from_raw_x509_ptr(cls, x509):
1098        cert = cls.__new__(cls)
1099        cert._x509 = _ffi.gc(x509, _lib.X509_free)
1100        cert._issuer_invalidator = _X509NameInvalidator()
1101        cert._subject_invalidator = _X509NameInvalidator()
1102        return cert
1103
1104    def to_cryptography(self):
1105        """
1106        Export as a ``cryptography`` certificate.
1107
1108        :rtype: ``cryptography.x509.Certificate``
1109
1110        .. versionadded:: 17.1.0
1111        """
1112        from cryptography.hazmat.backends.openssl.x509 import _Certificate
1113
1114        backend = _get_backend()
1115        return _Certificate(backend, self._x509)
1116
1117    @classmethod
1118    def from_cryptography(cls, crypto_cert):
1119        """
1120        Construct based on a ``cryptography`` *crypto_cert*.
1121
1122        :param crypto_key: A ``cryptography`` X.509 certificate.
1123        :type crypto_key: ``cryptography.x509.Certificate``
1124
1125        :rtype: X509
1126
1127        .. versionadded:: 17.1.0
1128        """
1129        if not isinstance(crypto_cert, x509.Certificate):
1130            raise TypeError("Must be a certificate")
1131
1132        cert = cls()
1133        cert._x509 = crypto_cert._x509
1134        return cert
1135
1136    def set_version(self, version):
1137        """
1138        Set the version number of the certificate. Note that the
1139        version value is zero-based, eg. a value of 0 is V1.
1140
1141        :param version: The version number of the certificate.
1142        :type version: :py:class:`int`
1143
1144        :return: ``None``
1145        """
1146        if not isinstance(version, int):
1147            raise TypeError("version must be an integer")
1148
1149        _lib.X509_set_version(self._x509, version)
1150
1151    def get_version(self):
1152        """
1153        Return the version number of the certificate.
1154
1155        :return: The version number of the certificate.
1156        :rtype: :py:class:`int`
1157        """
1158        return _lib.X509_get_version(self._x509)
1159
1160    def get_pubkey(self):
1161        """
1162        Get the public key of the certificate.
1163
1164        :return: The public key.
1165        :rtype: :py:class:`PKey`
1166        """
1167        pkey = PKey.__new__(PKey)
1168        pkey._pkey = _lib.X509_get_pubkey(self._x509)
1169        if pkey._pkey == _ffi.NULL:
1170            _raise_current_error()
1171        pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free)
1172        pkey._only_public = True
1173        return pkey
1174
1175    def set_pubkey(self, pkey):
1176        """
1177        Set the public key of the certificate.
1178
1179        :param pkey: The public key.
1180        :type pkey: :py:class:`PKey`
1181
1182        :return: :py:data:`None`
1183        """
1184        if not isinstance(pkey, PKey):
1185            raise TypeError("pkey must be a PKey instance")
1186
1187        set_result = _lib.X509_set_pubkey(self._x509, pkey._pkey)
1188        _openssl_assert(set_result == 1)
1189
1190    def sign(self, pkey, digest):
1191        """
1192        Sign the certificate with this key and digest type.
1193
1194        :param pkey: The key to sign with.
1195        :type pkey: :py:class:`PKey`
1196
1197        :param digest: The name of the message digest to use.
1198        :type digest: :py:class:`bytes`
1199
1200        :return: :py:data:`None`
1201        """
1202        if not isinstance(pkey, PKey):
1203            raise TypeError("pkey must be a PKey instance")
1204
1205        if pkey._only_public:
1206            raise ValueError("Key only has public part")
1207
1208        if not pkey._initialized:
1209            raise ValueError("Key is uninitialized")
1210
1211        evp_md = _lib.EVP_get_digestbyname(_byte_string(digest))
1212        if evp_md == _ffi.NULL:
1213            raise ValueError("No such digest method")
1214
1215        sign_result = _lib.X509_sign(self._x509, pkey._pkey, evp_md)
1216        _openssl_assert(sign_result > 0)
1217
1218    def get_signature_algorithm(self):
1219        """
1220        Return the signature algorithm used in the certificate.
1221
1222        :return: The name of the algorithm.
1223        :rtype: :py:class:`bytes`
1224
1225        :raises ValueError: If the signature algorithm is undefined.
1226
1227        .. versionadded:: 0.13
1228        """
1229        algor = _lib.X509_get0_tbs_sigalg(self._x509)
1230        nid = _lib.OBJ_obj2nid(algor.algorithm)
1231        if nid == _lib.NID_undef:
1232            raise ValueError("Undefined signature algorithm")
1233        return _ffi.string(_lib.OBJ_nid2ln(nid))
1234
1235    def digest(self, digest_name):
1236        """
1237        Return the digest of the X509 object.
1238
1239        :param digest_name: The name of the digest algorithm to use.
1240        :type digest_name: :py:class:`bytes`
1241
1242        :return: The digest of the object, formatted as
1243            :py:const:`b":"`-delimited hex pairs.
1244        :rtype: :py:class:`bytes`
1245        """
1246        digest = _lib.EVP_get_digestbyname(_byte_string(digest_name))
1247        if digest == _ffi.NULL:
1248            raise ValueError("No such digest method")
1249
1250        result_buffer = _ffi.new("unsigned char[]", _lib.EVP_MAX_MD_SIZE)
1251        result_length = _ffi.new("unsigned int[]", 1)
1252        result_length[0] = len(result_buffer)
1253
1254        digest_result = _lib.X509_digest(
1255            self._x509, digest, result_buffer, result_length
1256        )
1257        _openssl_assert(digest_result == 1)
1258
1259        return b":".join(
1260            [
1261                b16encode(ch).upper()
1262                for ch in _ffi.buffer(result_buffer, result_length[0])
1263            ]
1264        )
1265
1266    def subject_name_hash(self):
1267        """
1268        Return the hash of the X509 subject.
1269
1270        :return: The hash of the subject.
1271        :rtype: :py:class:`bytes`
1272        """
1273        return _lib.X509_subject_name_hash(self._x509)
1274
1275    def set_serial_number(self, serial):
1276        """
1277        Set the serial number of the certificate.
1278
1279        :param serial: The new serial number.
1280        :type serial: :py:class:`int`
1281
1282        :return: :py:data`None`
1283        """
1284        if not isinstance(serial, _integer_types):
1285            raise TypeError("serial must be an integer")
1286
1287        hex_serial = hex(serial)[2:]
1288        if not isinstance(hex_serial, bytes):
1289            hex_serial = hex_serial.encode("ascii")
1290
1291        bignum_serial = _ffi.new("BIGNUM**")
1292
1293        # BN_hex2bn stores the result in &bignum.  Unless it doesn't feel like
1294        # it.  If bignum is still NULL after this call, then the return value
1295        # is actually the result.  I hope.  -exarkun
1296        small_serial = _lib.BN_hex2bn(bignum_serial, hex_serial)
1297
1298        if bignum_serial[0] == _ffi.NULL:
1299            set_result = _lib.ASN1_INTEGER_set(
1300                _lib.X509_get_serialNumber(self._x509), small_serial
1301            )
1302            if set_result:
1303                # TODO Not tested
1304                _raise_current_error()
1305        else:
1306            asn1_serial = _lib.BN_to_ASN1_INTEGER(bignum_serial[0], _ffi.NULL)
1307            _lib.BN_free(bignum_serial[0])
1308            if asn1_serial == _ffi.NULL:
1309                # TODO Not tested
1310                _raise_current_error()
1311            asn1_serial = _ffi.gc(asn1_serial, _lib.ASN1_INTEGER_free)
1312            set_result = _lib.X509_set_serialNumber(self._x509, asn1_serial)
1313            _openssl_assert(set_result == 1)
1314
1315    def get_serial_number(self):
1316        """
1317        Return the serial number of this certificate.
1318
1319        :return: The serial number.
1320        :rtype: int
1321        """
1322        asn1_serial = _lib.X509_get_serialNumber(self._x509)
1323        bignum_serial = _lib.ASN1_INTEGER_to_BN(asn1_serial, _ffi.NULL)
1324        try:
1325            hex_serial = _lib.BN_bn2hex(bignum_serial)
1326            try:
1327                hexstring_serial = _ffi.string(hex_serial)
1328                serial = int(hexstring_serial, 16)
1329                return serial
1330            finally:
1331                _lib.OPENSSL_free(hex_serial)
1332        finally:
1333            _lib.BN_free(bignum_serial)
1334
1335    def gmtime_adj_notAfter(self, amount):
1336        """
1337        Adjust the time stamp on which the certificate stops being valid.
1338
1339        :param int amount: The number of seconds by which to adjust the
1340            timestamp.
1341        :return: ``None``
1342        """
1343        if not isinstance(amount, int):
1344            raise TypeError("amount must be an integer")
1345
1346        notAfter = _lib.X509_getm_notAfter(self._x509)
1347        _lib.X509_gmtime_adj(notAfter, amount)
1348
1349    def gmtime_adj_notBefore(self, amount):
1350        """
1351        Adjust the timestamp on which the certificate starts being valid.
1352
1353        :param amount: The number of seconds by which to adjust the timestamp.
1354        :return: ``None``
1355        """
1356        if not isinstance(amount, int):
1357            raise TypeError("amount must be an integer")
1358
1359        notBefore = _lib.X509_getm_notBefore(self._x509)
1360        _lib.X509_gmtime_adj(notBefore, amount)
1361
1362    def has_expired(self):
1363        """
1364        Check whether the certificate has expired.
1365
1366        :return: ``True`` if the certificate has expired, ``False`` otherwise.
1367        :rtype: bool
1368        """
1369        time_string = _native(self.get_notAfter())
1370        not_after = datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ")
1371
1372        return not_after < datetime.datetime.utcnow()
1373
1374    def _get_boundary_time(self, which):
1375        return _get_asn1_time(which(self._x509))
1376
1377    def get_notBefore(self):
1378        """
1379        Get the timestamp at which the certificate starts being valid.
1380
1381        The timestamp is formatted as an ASN.1 TIME::
1382
1383            YYYYMMDDhhmmssZ
1384
1385        :return: A timestamp string, or ``None`` if there is none.
1386        :rtype: bytes or NoneType
1387        """
1388        return self._get_boundary_time(_lib.X509_getm_notBefore)
1389
1390    def _set_boundary_time(self, which, when):
1391        return _set_asn1_time(which(self._x509), when)
1392
1393    def set_notBefore(self, when):
1394        """
1395        Set the timestamp at which the certificate starts being valid.
1396
1397        The timestamp is formatted as an ASN.1 TIME::
1398
1399            YYYYMMDDhhmmssZ
1400
1401        :param bytes when: A timestamp string.
1402        :return: ``None``
1403        """
1404        return self._set_boundary_time(_lib.X509_getm_notBefore, when)
1405
1406    def get_notAfter(self):
1407        """
1408        Get the timestamp at which the certificate stops being valid.
1409
1410        The timestamp is formatted as an ASN.1 TIME::
1411
1412            YYYYMMDDhhmmssZ
1413
1414        :return: A timestamp string, or ``None`` if there is none.
1415        :rtype: bytes or NoneType
1416        """
1417        return self._get_boundary_time(_lib.X509_getm_notAfter)
1418
1419    def set_notAfter(self, when):
1420        """
1421        Set the timestamp at which the certificate stops being valid.
1422
1423        The timestamp is formatted as an ASN.1 TIME::
1424
1425            YYYYMMDDhhmmssZ
1426
1427        :param bytes when: A timestamp string.
1428        :return: ``None``
1429        """
1430        return self._set_boundary_time(_lib.X509_getm_notAfter, when)
1431
1432    def _get_name(self, which):
1433        name = X509Name.__new__(X509Name)
1434        name._name = which(self._x509)
1435        _openssl_assert(name._name != _ffi.NULL)
1436
1437        # The name is owned by the X509 structure.  As long as the X509Name
1438        # Python object is alive, keep the X509 Python object alive.
1439        name._owner = self
1440
1441        return name
1442
1443    def _set_name(self, which, name):
1444        if not isinstance(name, X509Name):
1445            raise TypeError("name must be an X509Name")
1446        set_result = which(self._x509, name._name)
1447        _openssl_assert(set_result == 1)
1448
1449    def get_issuer(self):
1450        """
1451        Return the issuer of this certificate.
1452
1453        This creates a new :class:`X509Name` that wraps the underlying issuer
1454        name field on the certificate. Modifying it will modify the underlying
1455        certificate, and will have the effect of modifying any other
1456        :class:`X509Name` that refers to this issuer.
1457
1458        :return: The issuer of this certificate.
1459        :rtype: :class:`X509Name`
1460        """
1461        name = self._get_name(_lib.X509_get_issuer_name)
1462        self._issuer_invalidator.add(name)
1463        return name
1464
1465    def set_issuer(self, issuer):
1466        """
1467        Set the issuer of this certificate.
1468
1469        :param issuer: The issuer.
1470        :type issuer: :py:class:`X509Name`
1471
1472        :return: ``None``
1473        """
1474        self._set_name(_lib.X509_set_issuer_name, issuer)
1475        self._issuer_invalidator.clear()
1476
1477    def get_subject(self):
1478        """
1479        Return the subject of this certificate.
1480
1481        This creates a new :class:`X509Name` that wraps the underlying subject
1482        name field on the certificate. Modifying it will modify the underlying
1483        certificate, and will have the effect of modifying any other
1484        :class:`X509Name` that refers to this subject.
1485
1486        :return: The subject of this certificate.
1487        :rtype: :class:`X509Name`
1488        """
1489        name = self._get_name(_lib.X509_get_subject_name)
1490        self._subject_invalidator.add(name)
1491        return name
1492
1493    def set_subject(self, subject):
1494        """
1495        Set the subject of this certificate.
1496
1497        :param subject: The subject.
1498        :type subject: :py:class:`X509Name`
1499
1500        :return: ``None``
1501        """
1502        self._set_name(_lib.X509_set_subject_name, subject)
1503        self._subject_invalidator.clear()
1504
1505    def get_extension_count(self):
1506        """
1507        Get the number of extensions on this certificate.
1508
1509        :return: The number of extensions.
1510        :rtype: :py:class:`int`
1511
1512        .. versionadded:: 0.12
1513        """
1514        return _lib.X509_get_ext_count(self._x509)
1515
1516    def add_extensions(self, extensions):
1517        """
1518        Add extensions to the certificate.
1519
1520        :param extensions: The extensions to add.
1521        :type extensions: An iterable of :py:class:`X509Extension` objects.
1522        :return: ``None``
1523        """
1524        for ext in extensions:
1525            if not isinstance(ext, X509Extension):
1526                raise ValueError("One of the elements is not an X509Extension")
1527
1528            add_result = _lib.X509_add_ext(self._x509, ext._extension, -1)
1529            if not add_result:
1530                _raise_current_error()
1531
1532    def get_extension(self, index):
1533        """
1534        Get a specific extension of the certificate by index.
1535
1536        Extensions on a certificate are kept in order. The index
1537        parameter selects which extension will be returned.
1538
1539        :param int index: The index of the extension to retrieve.
1540        :return: The extension at the specified index.
1541        :rtype: :py:class:`X509Extension`
1542        :raises IndexError: If the extension index was out of bounds.
1543
1544        .. versionadded:: 0.12
1545        """
1546        ext = X509Extension.__new__(X509Extension)
1547        ext._extension = _lib.X509_get_ext(self._x509, index)
1548        if ext._extension == _ffi.NULL:
1549            raise IndexError("extension index out of bounds")
1550
1551        extension = _lib.X509_EXTENSION_dup(ext._extension)
1552        ext._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free)
1553        return ext
1554
1555
1556class X509StoreFlags(object):
1557    """
1558    Flags for X509 verification, used to change the behavior of
1559    :class:`X509Store`.
1560
1561    See `OpenSSL Verification Flags`_ for details.
1562
1563    .. _OpenSSL Verification Flags:
1564        https://www.openssl.org/docs/manmaster/man3/X509_VERIFY_PARAM_set_flags.html
1565    """
1566
1567    CRL_CHECK = _lib.X509_V_FLAG_CRL_CHECK
1568    CRL_CHECK_ALL = _lib.X509_V_FLAG_CRL_CHECK_ALL
1569    IGNORE_CRITICAL = _lib.X509_V_FLAG_IGNORE_CRITICAL
1570    X509_STRICT = _lib.X509_V_FLAG_X509_STRICT
1571    ALLOW_PROXY_CERTS = _lib.X509_V_FLAG_ALLOW_PROXY_CERTS
1572    POLICY_CHECK = _lib.X509_V_FLAG_POLICY_CHECK
1573    EXPLICIT_POLICY = _lib.X509_V_FLAG_EXPLICIT_POLICY
1574    INHIBIT_MAP = _lib.X509_V_FLAG_INHIBIT_MAP
1575    NOTIFY_POLICY = _lib.X509_V_FLAG_NOTIFY_POLICY
1576    CHECK_SS_SIGNATURE = _lib.X509_V_FLAG_CHECK_SS_SIGNATURE
1577    CB_ISSUER_CHECK = _lib.X509_V_FLAG_CB_ISSUER_CHECK
1578
1579
1580class X509Store(object):
1581    """
1582    An X.509 store.
1583
1584    An X.509 store is used to describe a context in which to verify a
1585    certificate. A description of a context may include a set of certificates
1586    to trust, a set of certificate revocation lists, verification flags and
1587    more.
1588
1589    An X.509 store, being only a description, cannot be used by itself to
1590    verify a certificate. To carry out the actual verification process, see
1591    :class:`X509StoreContext`.
1592    """
1593
1594    def __init__(self):
1595        store = _lib.X509_STORE_new()
1596        self._store = _ffi.gc(store, _lib.X509_STORE_free)
1597
1598    def add_cert(self, cert):
1599        """
1600        Adds a trusted certificate to this store.
1601
1602        Adding a certificate with this method adds this certificate as a
1603        *trusted* certificate.
1604
1605        :param X509 cert: The certificate to add to this store.
1606
1607        :raises TypeError: If the certificate is not an :class:`X509`.
1608
1609        :raises OpenSSL.crypto.Error: If OpenSSL was unhappy with your
1610            certificate.
1611
1612        :return: ``None`` if the certificate was added successfully.
1613        """
1614        if not isinstance(cert, X509):
1615            raise TypeError()
1616
1617        res = _lib.X509_STORE_add_cert(self._store, cert._x509)
1618        _openssl_assert(res == 1)
1619
1620    def add_crl(self, crl):
1621        """
1622        Add a certificate revocation list to this store.
1623
1624        The certificate revocation lists added to a store will only be used if
1625        the associated flags are configured to check certificate revocation
1626        lists.
1627
1628        .. versionadded:: 16.1.0
1629
1630        :param CRL crl: The certificate revocation list to add to this store.
1631        :return: ``None`` if the certificate revocation list was added
1632            successfully.
1633        """
1634        _openssl_assert(_lib.X509_STORE_add_crl(self._store, crl._crl) != 0)
1635
1636    def set_flags(self, flags):
1637        """
1638        Set verification flags to this store.
1639
1640        Verification flags can be combined by oring them together.
1641
1642        .. note::
1643
1644          Setting a verification flag sometimes requires clients to add
1645          additional information to the store, otherwise a suitable error will
1646          be raised.
1647
1648          For example, in setting flags to enable CRL checking a
1649          suitable CRL must be added to the store otherwise an error will be
1650          raised.
1651
1652        .. versionadded:: 16.1.0
1653
1654        :param int flags: The verification flags to set on this store.
1655            See :class:`X509StoreFlags` for available constants.
1656        :return: ``None`` if the verification flags were successfully set.
1657        """
1658        _openssl_assert(_lib.X509_STORE_set_flags(self._store, flags) != 0)
1659
1660    def set_time(self, vfy_time):
1661        """
1662        Set the time against which the certificates are verified.
1663
1664        Normally the current time is used.
1665
1666        .. note::
1667
1668          For example, you can determine if a certificate was valid at a given
1669          time.
1670
1671        .. versionadded:: 17.0.0
1672
1673        :param datetime vfy_time: The verification time to set on this store.
1674        :return: ``None`` if the verification time was successfully set.
1675        """
1676        param = _lib.X509_VERIFY_PARAM_new()
1677        param = _ffi.gc(param, _lib.X509_VERIFY_PARAM_free)
1678
1679        _lib.X509_VERIFY_PARAM_set_time(
1680            param, calendar.timegm(vfy_time.timetuple())
1681        )
1682        _openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0)
1683
1684    def load_locations(self, cafile, capath=None):
1685        """
1686        Let X509Store know where we can find trusted certificates for the
1687        certificate chain.  Note that the certificates have to be in PEM
1688        format.
1689
1690        If *capath* is passed, it must be a directory prepared using the
1691        ``c_rehash`` tool included with OpenSSL.  Either, but not both, of
1692        *cafile* or *capath* may be ``None``.
1693
1694        .. note::
1695
1696          Both *cafile* and *capath* may be set simultaneously.
1697
1698          Call this method multiple times to add more than one location.
1699          For example, CA certificates, and certificate revocation list bundles
1700          may be passed in *cafile* in subsequent calls to this method.
1701
1702        .. versionadded:: 20.0
1703
1704        :param cafile: In which file we can find the certificates (``bytes`` or
1705                       ``unicode``).
1706        :param capath: In which directory we can find the certificates
1707                       (``bytes`` or ``unicode``).
1708
1709        :return: ``None`` if the locations were set successfully.
1710
1711        :raises OpenSSL.crypto.Error: If both *cafile* and *capath* is ``None``
1712            or the locations could not be set for any reason.
1713
1714        """
1715        if cafile is None:
1716            cafile = _ffi.NULL
1717        else:
1718            cafile = _path_string(cafile)
1719
1720        if capath is None:
1721            capath = _ffi.NULL
1722        else:
1723            capath = _path_string(capath)
1724
1725        load_result = _lib.X509_STORE_load_locations(
1726            self._store, cafile, capath
1727        )
1728        if not load_result:
1729            _raise_current_error()
1730
1731
1732class X509StoreContextError(Exception):
1733    """
1734    An exception raised when an error occurred while verifying a certificate
1735    using `OpenSSL.X509StoreContext.verify_certificate`.
1736
1737    :ivar certificate: The certificate which caused verificate failure.
1738    :type certificate: :class:`X509`
1739    """
1740
1741    def __init__(self, message, certificate):
1742        super(X509StoreContextError, self).__init__(message)
1743        self.certificate = certificate
1744
1745
1746class X509StoreContext(object):
1747    """
1748    An X.509 store context.
1749
1750    An X.509 store context is used to carry out the actual verification process
1751    of a certificate in a described context. For describing such a context, see
1752    :class:`X509Store`.
1753
1754    :ivar _store_ctx: The underlying X509_STORE_CTX structure used by this
1755        instance.  It is dynamically allocated and automatically garbage
1756        collected.
1757    :ivar _store: See the ``store`` ``__init__`` parameter.
1758    :ivar _cert: See the ``certificate`` ``__init__`` parameter.
1759    :ivar _chain: See the ``chain`` ``__init__`` parameter.
1760    :param X509Store store: The certificates which will be trusted for the
1761        purposes of any verifications.
1762    :param X509 certificate: The certificate to be verified.
1763    :param chain: List of untrusted certificates that may be used for building
1764        the certificate chain. May be ``None``.
1765    :type chain: :class:`list` of :class:`X509`
1766    """
1767
1768    def __init__(self, store, certificate, chain=None):
1769        store_ctx = _lib.X509_STORE_CTX_new()
1770        self._store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free)
1771        self._store = store
1772        self._cert = certificate
1773        self._chain = self._build_certificate_stack(chain)
1774        # Make the store context available for use after instantiating this
1775        # class by initializing it now. Per testing, subsequent calls to
1776        # :meth:`_init` have no adverse affect.
1777        self._init()
1778
1779    @staticmethod
1780    def _build_certificate_stack(certificates):
1781        def cleanup(s):
1782            # Equivalent to sk_X509_pop_free, but we don't
1783            # currently have a CFFI binding for that available
1784            for i in range(_lib.sk_X509_num(s)):
1785                x = _lib.sk_X509_value(s, i)
1786                _lib.X509_free(x)
1787            _lib.sk_X509_free(s)
1788
1789        if certificates is None or len(certificates) == 0:
1790            return _ffi.NULL
1791
1792        stack = _lib.sk_X509_new_null()
1793        _openssl_assert(stack != _ffi.NULL)
1794        stack = _ffi.gc(stack, cleanup)
1795
1796        for cert in certificates:
1797            if not isinstance(cert, X509):
1798                raise TypeError("One of the elements is not an X509 instance")
1799
1800            _openssl_assert(_lib.X509_up_ref(cert._x509) > 0)
1801            if _lib.sk_X509_push(stack, cert._x509) <= 0:
1802                _lib.X509_free(cert._x509)
1803                _raise_current_error()
1804
1805        return stack
1806
1807    def _init(self):
1808        """
1809        Set up the store context for a subsequent verification operation.
1810
1811        Calling this method more than once without first calling
1812        :meth:`_cleanup` will leak memory.
1813        """
1814        ret = _lib.X509_STORE_CTX_init(
1815            self._store_ctx, self._store._store, self._cert._x509, self._chain
1816        )
1817        if ret <= 0:
1818            _raise_current_error()
1819
1820    def _cleanup(self):
1821        """
1822        Internally cleans up the store context.
1823
1824        The store context can then be reused with a new call to :meth:`_init`.
1825        """
1826        _lib.X509_STORE_CTX_cleanup(self._store_ctx)
1827
1828    def _exception_from_context(self):
1829        """
1830        Convert an OpenSSL native context error failure into a Python
1831        exception.
1832
1833        When a call to native OpenSSL X509_verify_cert fails, additional
1834        information about the failure can be obtained from the store context.
1835        """
1836        errors = [
1837            _lib.X509_STORE_CTX_get_error(self._store_ctx),
1838            _lib.X509_STORE_CTX_get_error_depth(self._store_ctx),
1839            _native(
1840                _ffi.string(
1841                    _lib.X509_verify_cert_error_string(
1842                        _lib.X509_STORE_CTX_get_error(self._store_ctx)
1843                    )
1844                )
1845            ),
1846        ]
1847        # A context error should always be associated with a certificate, so we
1848        # expect this call to never return :class:`None`.
1849        _x509 = _lib.X509_STORE_CTX_get_current_cert(self._store_ctx)
1850        _cert = _lib.X509_dup(_x509)
1851        pycert = X509._from_raw_x509_ptr(_cert)
1852        return X509StoreContextError(errors, pycert)
1853
1854    def set_store(self, store):
1855        """
1856        Set the context's X.509 store.
1857
1858        .. versionadded:: 0.15
1859
1860        :param X509Store store: The store description which will be used for
1861            the purposes of any *future* verifications.
1862        """
1863        self._store = store
1864
1865    def verify_certificate(self):
1866        """
1867        Verify a certificate in a context.
1868
1869        .. versionadded:: 0.15
1870
1871        :raises X509StoreContextError: If an error occurred when validating a
1872          certificate in the context. Sets ``certificate`` attribute to
1873          indicate which certificate caused the error.
1874        """
1875        # Always re-initialize the store context in case
1876        # :meth:`verify_certificate` is called multiple times.
1877        #
1878        # :meth:`_init` is called in :meth:`__init__` so _cleanup is called
1879        # before _init to ensure memory is not leaked.
1880        self._cleanup()
1881        self._init()
1882        ret = _lib.X509_verify_cert(self._store_ctx)
1883        self._cleanup()
1884        if ret <= 0:
1885            raise self._exception_from_context()
1886
1887    def get_verified_chain(self):
1888        """
1889        Verify a certificate in a context and return the complete validated
1890        chain.
1891
1892        :raises X509StoreContextError: If an error occurred when validating a
1893          certificate in the context. Sets ``certificate`` attribute to
1894          indicate which certificate caused the error.
1895
1896        .. versionadded:: 20.0
1897        """
1898        # Always re-initialize the store context in case
1899        # :meth:`verify_certificate` is called multiple times.
1900        #
1901        # :meth:`_init` is called in :meth:`__init__` so _cleanup is called
1902        # before _init to ensure memory is not leaked.
1903        self._cleanup()
1904        self._init()
1905        ret = _lib.X509_verify_cert(self._store_ctx)
1906        if ret <= 0:
1907            self._cleanup()
1908            raise self._exception_from_context()
1909
1910        # Note: X509_STORE_CTX_get1_chain returns a deep copy of the chain.
1911        cert_stack = _lib.X509_STORE_CTX_get1_chain(self._store_ctx)
1912        _openssl_assert(cert_stack != _ffi.NULL)
1913
1914        result = []
1915        for i in range(_lib.sk_X509_num(cert_stack)):
1916            cert = _lib.sk_X509_value(cert_stack, i)
1917            _openssl_assert(cert != _ffi.NULL)
1918            pycert = X509._from_raw_x509_ptr(cert)
1919            result.append(pycert)
1920
1921        # Free the stack but not the members which are freed by the X509 class.
1922        _lib.sk_X509_free(cert_stack)
1923        self._cleanup()
1924        return result
1925
1926
1927def load_certificate(type, buffer):
1928    """
1929    Load a certificate (X509) from the string *buffer* encoded with the
1930    type *type*.
1931
1932    :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
1933
1934    :param bytes buffer: The buffer the certificate is stored in
1935
1936    :return: The X509 object
1937    """
1938    if isinstance(buffer, _text_type):
1939        buffer = buffer.encode("ascii")
1940
1941    bio = _new_mem_buf(buffer)
1942
1943    if type == FILETYPE_PEM:
1944        x509 = _lib.PEM_read_bio_X509(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
1945    elif type == FILETYPE_ASN1:
1946        x509 = _lib.d2i_X509_bio(bio, _ffi.NULL)
1947    else:
1948        raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
1949
1950    if x509 == _ffi.NULL:
1951        _raise_current_error()
1952
1953    return X509._from_raw_x509_ptr(x509)
1954
1955
1956def dump_certificate(type, cert):
1957    """
1958    Dump the certificate *cert* into a buffer string encoded with the type
1959    *type*.
1960
1961    :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1, or
1962        FILETYPE_TEXT)
1963    :param cert: The certificate to dump
1964    :return: The buffer with the dumped certificate in
1965    """
1966    bio = _new_mem_buf()
1967
1968    if type == FILETYPE_PEM:
1969        result_code = _lib.PEM_write_bio_X509(bio, cert._x509)
1970    elif type == FILETYPE_ASN1:
1971        result_code = _lib.i2d_X509_bio(bio, cert._x509)
1972    elif type == FILETYPE_TEXT:
1973        result_code = _lib.X509_print_ex(bio, cert._x509, 0, 0)
1974    else:
1975        raise ValueError(
1976            "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
1977            "FILETYPE_TEXT"
1978        )
1979
1980    _openssl_assert(result_code == 1)
1981    return _bio_to_string(bio)
1982
1983
1984def dump_publickey(type, pkey):
1985    """
1986    Dump a public key to a buffer.
1987
1988    :param type: The file type (one of :data:`FILETYPE_PEM` or
1989        :data:`FILETYPE_ASN1`).
1990    :param PKey pkey: The public key to dump
1991    :return: The buffer with the dumped key in it.
1992    :rtype: bytes
1993    """
1994    bio = _new_mem_buf()
1995    if type == FILETYPE_PEM:
1996        write_bio = _lib.PEM_write_bio_PUBKEY
1997    elif type == FILETYPE_ASN1:
1998        write_bio = _lib.i2d_PUBKEY_bio
1999    else:
2000        raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2001
2002    result_code = write_bio(bio, pkey._pkey)
2003    if result_code != 1:  # pragma: no cover
2004        _raise_current_error()
2005
2006    return _bio_to_string(bio)
2007
2008
2009def dump_privatekey(type, pkey, cipher=None, passphrase=None):
2010    """
2011    Dump the private key *pkey* into a buffer string encoded with the type
2012    *type*.  Optionally (if *type* is :const:`FILETYPE_PEM`) encrypting it
2013    using *cipher* and *passphrase*.
2014
2015    :param type: The file type (one of :const:`FILETYPE_PEM`,
2016        :const:`FILETYPE_ASN1`, or :const:`FILETYPE_TEXT`)
2017    :param PKey pkey: The PKey to dump
2018    :param cipher: (optional) if encrypted PEM format, the cipher to use
2019    :param passphrase: (optional) if encrypted PEM format, this can be either
2020        the passphrase to use, or a callback for providing the passphrase.
2021
2022    :return: The buffer with the dumped key in
2023    :rtype: bytes
2024    """
2025    bio = _new_mem_buf()
2026
2027    if not isinstance(pkey, PKey):
2028        raise TypeError("pkey must be a PKey")
2029
2030    if cipher is not None:
2031        if passphrase is None:
2032            raise TypeError(
2033                "if a value is given for cipher "
2034                "one must also be given for passphrase"
2035            )
2036        cipher_obj = _lib.EVP_get_cipherbyname(_byte_string(cipher))
2037        if cipher_obj == _ffi.NULL:
2038            raise ValueError("Invalid cipher name")
2039    else:
2040        cipher_obj = _ffi.NULL
2041
2042    helper = _PassphraseHelper(type, passphrase)
2043    if type == FILETYPE_PEM:
2044        result_code = _lib.PEM_write_bio_PrivateKey(
2045            bio,
2046            pkey._pkey,
2047            cipher_obj,
2048            _ffi.NULL,
2049            0,
2050            helper.callback,
2051            helper.callback_args,
2052        )
2053        helper.raise_if_problem()
2054    elif type == FILETYPE_ASN1:
2055        result_code = _lib.i2d_PrivateKey_bio(bio, pkey._pkey)
2056    elif type == FILETYPE_TEXT:
2057        if _lib.EVP_PKEY_id(pkey._pkey) != _lib.EVP_PKEY_RSA:
2058            raise TypeError("Only RSA keys are supported for FILETYPE_TEXT")
2059
2060        rsa = _ffi.gc(_lib.EVP_PKEY_get1_RSA(pkey._pkey), _lib.RSA_free)
2061        result_code = _lib.RSA_print(bio, rsa, 0)
2062    else:
2063        raise ValueError(
2064            "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
2065            "FILETYPE_TEXT"
2066        )
2067
2068    _openssl_assert(result_code != 0)
2069
2070    return _bio_to_string(bio)
2071
2072
2073class Revoked(object):
2074    """
2075    A certificate revocation.
2076    """
2077
2078    # https://www.openssl.org/docs/manmaster/man5/x509v3_config.html#CRL-distribution-points
2079    # which differs from crl_reasons of crypto/x509v3/v3_enum.c that matches
2080    # OCSP_crl_reason_str.  We use the latter, just like the command line
2081    # program.
2082    _crl_reasons = [
2083        b"unspecified",
2084        b"keyCompromise",
2085        b"CACompromise",
2086        b"affiliationChanged",
2087        b"superseded",
2088        b"cessationOfOperation",
2089        b"certificateHold",
2090        # b"removeFromCRL",
2091    ]
2092
2093    def __init__(self):
2094        revoked = _lib.X509_REVOKED_new()
2095        self._revoked = _ffi.gc(revoked, _lib.X509_REVOKED_free)
2096
2097    def set_serial(self, hex_str):
2098        """
2099        Set the serial number.
2100
2101        The serial number is formatted as a hexadecimal number encoded in
2102        ASCII.
2103
2104        :param bytes hex_str: The new serial number.
2105
2106        :return: ``None``
2107        """
2108        bignum_serial = _ffi.gc(_lib.BN_new(), _lib.BN_free)
2109        bignum_ptr = _ffi.new("BIGNUM**")
2110        bignum_ptr[0] = bignum_serial
2111        bn_result = _lib.BN_hex2bn(bignum_ptr, hex_str)
2112        if not bn_result:
2113            raise ValueError("bad hex string")
2114
2115        asn1_serial = _ffi.gc(
2116            _lib.BN_to_ASN1_INTEGER(bignum_serial, _ffi.NULL),
2117            _lib.ASN1_INTEGER_free,
2118        )
2119        _lib.X509_REVOKED_set_serialNumber(self._revoked, asn1_serial)
2120
2121    def get_serial(self):
2122        """
2123        Get the serial number.
2124
2125        The serial number is formatted as a hexadecimal number encoded in
2126        ASCII.
2127
2128        :return: The serial number.
2129        :rtype: bytes
2130        """
2131        bio = _new_mem_buf()
2132
2133        asn1_int = _lib.X509_REVOKED_get0_serialNumber(self._revoked)
2134        _openssl_assert(asn1_int != _ffi.NULL)
2135        result = _lib.i2a_ASN1_INTEGER(bio, asn1_int)
2136        _openssl_assert(result >= 0)
2137        return _bio_to_string(bio)
2138
2139    def _delete_reason(self):
2140        for i in range(_lib.X509_REVOKED_get_ext_count(self._revoked)):
2141            ext = _lib.X509_REVOKED_get_ext(self._revoked, i)
2142            obj = _lib.X509_EXTENSION_get_object(ext)
2143            if _lib.OBJ_obj2nid(obj) == _lib.NID_crl_reason:
2144                _lib.X509_EXTENSION_free(ext)
2145                _lib.X509_REVOKED_delete_ext(self._revoked, i)
2146                break
2147
2148    def set_reason(self, reason):
2149        """
2150        Set the reason of this revocation.
2151
2152        If :data:`reason` is ``None``, delete the reason instead.
2153
2154        :param reason: The reason string.
2155        :type reason: :class:`bytes` or :class:`NoneType`
2156
2157        :return: ``None``
2158
2159        .. seealso::
2160
2161            :meth:`all_reasons`, which gives you a list of all supported
2162            reasons which you might pass to this method.
2163        """
2164        if reason is None:
2165            self._delete_reason()
2166        elif not isinstance(reason, bytes):
2167            raise TypeError("reason must be None or a byte string")
2168        else:
2169            reason = reason.lower().replace(b" ", b"")
2170            reason_code = [r.lower() for r in self._crl_reasons].index(reason)
2171
2172            new_reason_ext = _lib.ASN1_ENUMERATED_new()
2173            _openssl_assert(new_reason_ext != _ffi.NULL)
2174            new_reason_ext = _ffi.gc(new_reason_ext, _lib.ASN1_ENUMERATED_free)
2175
2176            set_result = _lib.ASN1_ENUMERATED_set(new_reason_ext, reason_code)
2177            _openssl_assert(set_result != _ffi.NULL)
2178
2179            self._delete_reason()
2180            add_result = _lib.X509_REVOKED_add1_ext_i2d(
2181                self._revoked, _lib.NID_crl_reason, new_reason_ext, 0, 0
2182            )
2183            _openssl_assert(add_result == 1)
2184
2185    def get_reason(self):
2186        """
2187        Get the reason of this revocation.
2188
2189        :return: The reason, or ``None`` if there is none.
2190        :rtype: bytes or NoneType
2191
2192        .. seealso::
2193
2194            :meth:`all_reasons`, which gives you a list of all supported
2195            reasons this method might return.
2196        """
2197        for i in range(_lib.X509_REVOKED_get_ext_count(self._revoked)):
2198            ext = _lib.X509_REVOKED_get_ext(self._revoked, i)
2199            obj = _lib.X509_EXTENSION_get_object(ext)
2200            if _lib.OBJ_obj2nid(obj) == _lib.NID_crl_reason:
2201                bio = _new_mem_buf()
2202
2203                print_result = _lib.X509V3_EXT_print(bio, ext, 0, 0)
2204                if not print_result:
2205                    print_result = _lib.M_ASN1_OCTET_STRING_print(
2206                        bio, _lib.X509_EXTENSION_get_data(ext)
2207                    )
2208                    _openssl_assert(print_result != 0)
2209
2210                return _bio_to_string(bio)
2211
2212    def all_reasons(self):
2213        """
2214        Return a list of all the supported reason strings.
2215
2216        This list is a copy; modifying it does not change the supported reason
2217        strings.
2218
2219        :return: A list of reason strings.
2220        :rtype: :class:`list` of :class:`bytes`
2221        """
2222        return self._crl_reasons[:]
2223
2224    def set_rev_date(self, when):
2225        """
2226        Set the revocation timestamp.
2227
2228        :param bytes when: The timestamp of the revocation,
2229            as ASN.1 TIME.
2230        :return: ``None``
2231        """
2232        dt = _lib.X509_REVOKED_get0_revocationDate(self._revoked)
2233        return _set_asn1_time(dt, when)
2234
2235    def get_rev_date(self):
2236        """
2237        Get the revocation timestamp.
2238
2239        :return: The timestamp of the revocation, as ASN.1 TIME.
2240        :rtype: bytes
2241        """
2242        dt = _lib.X509_REVOKED_get0_revocationDate(self._revoked)
2243        return _get_asn1_time(dt)
2244
2245
2246class CRL(object):
2247    """
2248    A certificate revocation list.
2249    """
2250
2251    def __init__(self):
2252        crl = _lib.X509_CRL_new()
2253        self._crl = _ffi.gc(crl, _lib.X509_CRL_free)
2254
2255    def to_cryptography(self):
2256        """
2257        Export as a ``cryptography`` CRL.
2258
2259        :rtype: ``cryptography.x509.CertificateRevocationList``
2260
2261        .. versionadded:: 17.1.0
2262        """
2263        from cryptography.hazmat.backends.openssl.x509 import (
2264            _CertificateRevocationList,
2265        )
2266
2267        backend = _get_backend()
2268        return _CertificateRevocationList(backend, self._crl)
2269
2270    @classmethod
2271    def from_cryptography(cls, crypto_crl):
2272        """
2273        Construct based on a ``cryptography`` *crypto_crl*.
2274
2275        :param crypto_crl: A ``cryptography`` certificate revocation list
2276        :type crypto_crl: ``cryptography.x509.CertificateRevocationList``
2277
2278        :rtype: CRL
2279
2280        .. versionadded:: 17.1.0
2281        """
2282        if not isinstance(crypto_crl, x509.CertificateRevocationList):
2283            raise TypeError("Must be a certificate revocation list")
2284
2285        crl = cls()
2286        crl._crl = crypto_crl._x509_crl
2287        return crl
2288
2289    def get_revoked(self):
2290        """
2291        Return the revocations in this certificate revocation list.
2292
2293        These revocations will be provided by value, not by reference.
2294        That means it's okay to mutate them: it won't affect this CRL.
2295
2296        :return: The revocations in this CRL.
2297        :rtype: :class:`tuple` of :class:`Revocation`
2298        """
2299        results = []
2300        revoked_stack = _lib.X509_CRL_get_REVOKED(self._crl)
2301        for i in range(_lib.sk_X509_REVOKED_num(revoked_stack)):
2302            revoked = _lib.sk_X509_REVOKED_value(revoked_stack, i)
2303            revoked_copy = _lib.Cryptography_X509_REVOKED_dup(revoked)
2304            pyrev = Revoked.__new__(Revoked)
2305            pyrev._revoked = _ffi.gc(revoked_copy, _lib.X509_REVOKED_free)
2306            results.append(pyrev)
2307        if results:
2308            return tuple(results)
2309
2310    def add_revoked(self, revoked):
2311        """
2312        Add a revoked (by value not reference) to the CRL structure
2313
2314        This revocation will be added by value, not by reference. That
2315        means it's okay to mutate it after adding: it won't affect
2316        this CRL.
2317
2318        :param Revoked revoked: The new revocation.
2319        :return: ``None``
2320        """
2321        copy = _lib.Cryptography_X509_REVOKED_dup(revoked._revoked)
2322        _openssl_assert(copy != _ffi.NULL)
2323
2324        add_result = _lib.X509_CRL_add0_revoked(self._crl, copy)
2325        _openssl_assert(add_result != 0)
2326
2327    def get_issuer(self):
2328        """
2329        Get the CRL's issuer.
2330
2331        .. versionadded:: 16.1.0
2332
2333        :rtype: X509Name
2334        """
2335        _issuer = _lib.X509_NAME_dup(_lib.X509_CRL_get_issuer(self._crl))
2336        _openssl_assert(_issuer != _ffi.NULL)
2337        _issuer = _ffi.gc(_issuer, _lib.X509_NAME_free)
2338        issuer = X509Name.__new__(X509Name)
2339        issuer._name = _issuer
2340        return issuer
2341
2342    def set_version(self, version):
2343        """
2344        Set the CRL version.
2345
2346        .. versionadded:: 16.1.0
2347
2348        :param int version: The version of the CRL.
2349        :return: ``None``
2350        """
2351        _openssl_assert(_lib.X509_CRL_set_version(self._crl, version) != 0)
2352
2353    def _set_boundary_time(self, which, when):
2354        return _set_asn1_time(which(self._crl), when)
2355
2356    def set_lastUpdate(self, when):
2357        """
2358        Set when the CRL was last updated.
2359
2360        The timestamp is formatted as an ASN.1 TIME::
2361
2362            YYYYMMDDhhmmssZ
2363
2364        .. versionadded:: 16.1.0
2365
2366        :param bytes when: A timestamp string.
2367        :return: ``None``
2368        """
2369        return self._set_boundary_time(_lib.X509_CRL_get_lastUpdate, when)
2370
2371    def set_nextUpdate(self, when):
2372        """
2373        Set when the CRL will next be updated.
2374
2375        The timestamp is formatted as an ASN.1 TIME::
2376
2377            YYYYMMDDhhmmssZ
2378
2379        .. versionadded:: 16.1.0
2380
2381        :param bytes when: A timestamp string.
2382        :return: ``None``
2383        """
2384        return self._set_boundary_time(_lib.X509_CRL_get_nextUpdate, when)
2385
2386    def sign(self, issuer_cert, issuer_key, digest):
2387        """
2388        Sign the CRL.
2389
2390        Signing a CRL enables clients to associate the CRL itself with an
2391        issuer. Before a CRL is meaningful to other OpenSSL functions, it must
2392        be signed by an issuer.
2393
2394        This method implicitly sets the issuer's name based on the issuer
2395        certificate and private key used to sign the CRL.
2396
2397        .. versionadded:: 16.1.0
2398
2399        :param X509 issuer_cert: The issuer's certificate.
2400        :param PKey issuer_key: The issuer's private key.
2401        :param bytes digest: The digest method to sign the CRL with.
2402        """
2403        digest_obj = _lib.EVP_get_digestbyname(digest)
2404        _openssl_assert(digest_obj != _ffi.NULL)
2405        _lib.X509_CRL_set_issuer_name(
2406            self._crl, _lib.X509_get_subject_name(issuer_cert._x509)
2407        )
2408        _lib.X509_CRL_sort(self._crl)
2409        result = _lib.X509_CRL_sign(self._crl, issuer_key._pkey, digest_obj)
2410        _openssl_assert(result != 0)
2411
2412    def export(
2413        self, cert, key, type=FILETYPE_PEM, days=100, digest=_UNSPECIFIED
2414    ):
2415        """
2416        Export the CRL as a string.
2417
2418        :param X509 cert: The certificate used to sign the CRL.
2419        :param PKey key: The key used to sign the CRL.
2420        :param int type: The export format, either :data:`FILETYPE_PEM`,
2421            :data:`FILETYPE_ASN1`, or :data:`FILETYPE_TEXT`.
2422        :param int days: The number of days until the next update of this CRL.
2423        :param bytes digest: The name of the message digest to use (eg
2424            ``b"sha256"``).
2425        :rtype: bytes
2426        """
2427
2428        if not isinstance(cert, X509):
2429            raise TypeError("cert must be an X509 instance")
2430        if not isinstance(key, PKey):
2431            raise TypeError("key must be a PKey instance")
2432        if not isinstance(type, int):
2433            raise TypeError("type must be an integer")
2434
2435        if digest is _UNSPECIFIED:
2436            raise TypeError("digest must be provided")
2437
2438        digest_obj = _lib.EVP_get_digestbyname(digest)
2439        if digest_obj == _ffi.NULL:
2440            raise ValueError("No such digest method")
2441
2442        bio = _lib.BIO_new(_lib.BIO_s_mem())
2443        _openssl_assert(bio != _ffi.NULL)
2444
2445        # A scratch time object to give different values to different CRL
2446        # fields
2447        sometime = _lib.ASN1_TIME_new()
2448        _openssl_assert(sometime != _ffi.NULL)
2449
2450        _lib.X509_gmtime_adj(sometime, 0)
2451        _lib.X509_CRL_set_lastUpdate(self._crl, sometime)
2452
2453        _lib.X509_gmtime_adj(sometime, days * 24 * 60 * 60)
2454        _lib.X509_CRL_set_nextUpdate(self._crl, sometime)
2455
2456        _lib.X509_CRL_set_issuer_name(
2457            self._crl, _lib.X509_get_subject_name(cert._x509)
2458        )
2459
2460        sign_result = _lib.X509_CRL_sign(self._crl, key._pkey, digest_obj)
2461        if not sign_result:
2462            _raise_current_error()
2463
2464        return dump_crl(type, self)
2465
2466
2467class PKCS7(object):
2468    def type_is_signed(self):
2469        """
2470        Check if this NID_pkcs7_signed object
2471
2472        :return: True if the PKCS7 is of type signed
2473        """
2474        return bool(_lib.PKCS7_type_is_signed(self._pkcs7))
2475
2476    def type_is_enveloped(self):
2477        """
2478        Check if this NID_pkcs7_enveloped object
2479
2480        :returns: True if the PKCS7 is of type enveloped
2481        """
2482        return bool(_lib.PKCS7_type_is_enveloped(self._pkcs7))
2483
2484    def type_is_signedAndEnveloped(self):
2485        """
2486        Check if this NID_pkcs7_signedAndEnveloped object
2487
2488        :returns: True if the PKCS7 is of type signedAndEnveloped
2489        """
2490        return bool(_lib.PKCS7_type_is_signedAndEnveloped(self._pkcs7))
2491
2492    def type_is_data(self):
2493        """
2494        Check if this NID_pkcs7_data object
2495
2496        :return: True if the PKCS7 is of type data
2497        """
2498        return bool(_lib.PKCS7_type_is_data(self._pkcs7))
2499
2500    def get_type_name(self):
2501        """
2502        Returns the type name of the PKCS7 structure
2503
2504        :return: A string with the typename
2505        """
2506        nid = _lib.OBJ_obj2nid(self._pkcs7.type)
2507        string_type = _lib.OBJ_nid2sn(nid)
2508        return _ffi.string(string_type)
2509
2510
2511class PKCS12(object):
2512    """
2513    A PKCS #12 archive.
2514    """
2515
2516    def __init__(self):
2517        self._pkey = None
2518        self._cert = None
2519        self._cacerts = None
2520        self._friendlyname = None
2521
2522    def get_certificate(self):
2523        """
2524        Get the certificate in the PKCS #12 structure.
2525
2526        :return: The certificate, or :py:const:`None` if there is none.
2527        :rtype: :py:class:`X509` or :py:const:`None`
2528        """
2529        return self._cert
2530
2531    def set_certificate(self, cert):
2532        """
2533        Set the certificate in the PKCS #12 structure.
2534
2535        :param cert: The new certificate, or :py:const:`None` to unset it.
2536        :type cert: :py:class:`X509` or :py:const:`None`
2537
2538        :return: ``None``
2539        """
2540        if not isinstance(cert, X509):
2541            raise TypeError("cert must be an X509 instance")
2542        self._cert = cert
2543
2544    def get_privatekey(self):
2545        """
2546        Get the private key in the PKCS #12 structure.
2547
2548        :return: The private key, or :py:const:`None` if there is none.
2549        :rtype: :py:class:`PKey`
2550        """
2551        return self._pkey
2552
2553    def set_privatekey(self, pkey):
2554        """
2555        Set the certificate portion of the PKCS #12 structure.
2556
2557        :param pkey: The new private key, or :py:const:`None` to unset it.
2558        :type pkey: :py:class:`PKey` or :py:const:`None`
2559
2560        :return: ``None``
2561        """
2562        if not isinstance(pkey, PKey):
2563            raise TypeError("pkey must be a PKey instance")
2564        self._pkey = pkey
2565
2566    def get_ca_certificates(self):
2567        """
2568        Get the CA certificates in the PKCS #12 structure.
2569
2570        :return: A tuple with the CA certificates in the chain, or
2571            :py:const:`None` if there are none.
2572        :rtype: :py:class:`tuple` of :py:class:`X509` or :py:const:`None`
2573        """
2574        if self._cacerts is not None:
2575            return tuple(self._cacerts)
2576
2577    def set_ca_certificates(self, cacerts):
2578        """
2579        Replace or set the CA certificates within the PKCS12 object.
2580
2581        :param cacerts: The new CA certificates, or :py:const:`None` to unset
2582            them.
2583        :type cacerts: An iterable of :py:class:`X509` or :py:const:`None`
2584
2585        :return: ``None``
2586        """
2587        if cacerts is None:
2588            self._cacerts = None
2589        else:
2590            cacerts = list(cacerts)
2591            for cert in cacerts:
2592                if not isinstance(cert, X509):
2593                    raise TypeError(
2594                        "iterable must only contain X509 instances"
2595                    )
2596            self._cacerts = cacerts
2597
2598    def set_friendlyname(self, name):
2599        """
2600        Set the friendly name in the PKCS #12 structure.
2601
2602        :param name: The new friendly name, or :py:const:`None` to unset.
2603        :type name: :py:class:`bytes` or :py:const:`None`
2604
2605        :return: ``None``
2606        """
2607        if name is None:
2608            self._friendlyname = None
2609        elif not isinstance(name, bytes):
2610            raise TypeError(
2611                "name must be a byte string or None (not %r)" % (name,)
2612            )
2613        self._friendlyname = name
2614
2615    def get_friendlyname(self):
2616        """
2617        Get the friendly name in the PKCS# 12 structure.
2618
2619        :returns: The friendly name,  or :py:const:`None` if there is none.
2620        :rtype: :py:class:`bytes` or :py:const:`None`
2621        """
2622        return self._friendlyname
2623
2624    def export(self, passphrase=None, iter=2048, maciter=1):
2625        """
2626        Dump a PKCS12 object as a string.
2627
2628        For more information, see the :c:func:`PKCS12_create` man page.
2629
2630        :param passphrase: The passphrase used to encrypt the structure. Unlike
2631            some other passphrase arguments, this *must* be a string, not a
2632            callback.
2633        :type passphrase: :py:data:`bytes`
2634
2635        :param iter: Number of times to repeat the encryption step.
2636        :type iter: :py:data:`int`
2637
2638        :param maciter: Number of times to repeat the MAC step.
2639        :type maciter: :py:data:`int`
2640
2641        :return: The string representation of the PKCS #12 structure.
2642        :rtype:
2643        """
2644        passphrase = _text_to_bytes_and_warn("passphrase", passphrase)
2645
2646        if self._cacerts is None:
2647            cacerts = _ffi.NULL
2648        else:
2649            cacerts = _lib.sk_X509_new_null()
2650            cacerts = _ffi.gc(cacerts, _lib.sk_X509_free)
2651            for cert in self._cacerts:
2652                _lib.sk_X509_push(cacerts, cert._x509)
2653
2654        if passphrase is None:
2655            passphrase = _ffi.NULL
2656
2657        friendlyname = self._friendlyname
2658        if friendlyname is None:
2659            friendlyname = _ffi.NULL
2660
2661        if self._pkey is None:
2662            pkey = _ffi.NULL
2663        else:
2664            pkey = self._pkey._pkey
2665
2666        if self._cert is None:
2667            cert = _ffi.NULL
2668        else:
2669            cert = self._cert._x509
2670
2671        pkcs12 = _lib.PKCS12_create(
2672            passphrase,
2673            friendlyname,
2674            pkey,
2675            cert,
2676            cacerts,
2677            _lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC,
2678            _lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC,
2679            iter,
2680            maciter,
2681            0,
2682        )
2683        if pkcs12 == _ffi.NULL:
2684            _raise_current_error()
2685        pkcs12 = _ffi.gc(pkcs12, _lib.PKCS12_free)
2686
2687        bio = _new_mem_buf()
2688        _lib.i2d_PKCS12_bio(bio, pkcs12)
2689        return _bio_to_string(bio)
2690
2691
2692class NetscapeSPKI(object):
2693    """
2694    A Netscape SPKI object.
2695    """
2696
2697    def __init__(self):
2698        spki = _lib.NETSCAPE_SPKI_new()
2699        self._spki = _ffi.gc(spki, _lib.NETSCAPE_SPKI_free)
2700
2701    def sign(self, pkey, digest):
2702        """
2703        Sign the certificate request with this key and digest type.
2704
2705        :param pkey: The private key to sign with.
2706        :type pkey: :py:class:`PKey`
2707
2708        :param digest: The message digest to use.
2709        :type digest: :py:class:`bytes`
2710
2711        :return: ``None``
2712        """
2713        if pkey._only_public:
2714            raise ValueError("Key has only public part")
2715
2716        if not pkey._initialized:
2717            raise ValueError("Key is uninitialized")
2718
2719        digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
2720        if digest_obj == _ffi.NULL:
2721            raise ValueError("No such digest method")
2722
2723        sign_result = _lib.NETSCAPE_SPKI_sign(
2724            self._spki, pkey._pkey, digest_obj
2725        )
2726        _openssl_assert(sign_result > 0)
2727
2728    def verify(self, key):
2729        """
2730        Verifies a signature on a certificate request.
2731
2732        :param PKey key: The public key that signature is supposedly from.
2733
2734        :return: ``True`` if the signature is correct.
2735        :rtype: bool
2736
2737        :raises OpenSSL.crypto.Error: If the signature is invalid, or there was
2738            a problem verifying the signature.
2739        """
2740        answer = _lib.NETSCAPE_SPKI_verify(self._spki, key._pkey)
2741        if answer <= 0:
2742            _raise_current_error()
2743        return True
2744
2745    def b64_encode(self):
2746        """
2747        Generate a base64 encoded representation of this SPKI object.
2748
2749        :return: The base64 encoded string.
2750        :rtype: :py:class:`bytes`
2751        """
2752        encoded = _lib.NETSCAPE_SPKI_b64_encode(self._spki)
2753        result = _ffi.string(encoded)
2754        _lib.OPENSSL_free(encoded)
2755        return result
2756
2757    def get_pubkey(self):
2758        """
2759        Get the public key of this certificate.
2760
2761        :return: The public key.
2762        :rtype: :py:class:`PKey`
2763        """
2764        pkey = PKey.__new__(PKey)
2765        pkey._pkey = _lib.NETSCAPE_SPKI_get_pubkey(self._spki)
2766        _openssl_assert(pkey._pkey != _ffi.NULL)
2767        pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free)
2768        pkey._only_public = True
2769        return pkey
2770
2771    def set_pubkey(self, pkey):
2772        """
2773        Set the public key of the certificate
2774
2775        :param pkey: The public key
2776        :return: ``None``
2777        """
2778        set_result = _lib.NETSCAPE_SPKI_set_pubkey(self._spki, pkey._pkey)
2779        _openssl_assert(set_result == 1)
2780
2781
2782class _PassphraseHelper(object):
2783    def __init__(self, type, passphrase, more_args=False, truncate=False):
2784        if type != FILETYPE_PEM and passphrase is not None:
2785            raise ValueError(
2786                "only FILETYPE_PEM key format supports encryption"
2787            )
2788        self._passphrase = passphrase
2789        self._more_args = more_args
2790        self._truncate = truncate
2791        self._problems = []
2792
2793    @property
2794    def callback(self):
2795        if self._passphrase is None:
2796            return _ffi.NULL
2797        elif isinstance(self._passphrase, bytes) or callable(self._passphrase):
2798            return _ffi.callback("pem_password_cb", self._read_passphrase)
2799        else:
2800            raise TypeError(
2801                "Last argument must be a byte string or a callable."
2802            )
2803
2804    @property
2805    def callback_args(self):
2806        if self._passphrase is None:
2807            return _ffi.NULL
2808        elif isinstance(self._passphrase, bytes) or callable(self._passphrase):
2809            return _ffi.NULL
2810        else:
2811            raise TypeError(
2812                "Last argument must be a byte string or a callable."
2813            )
2814
2815    def raise_if_problem(self, exceptionType=Error):
2816        if self._problems:
2817
2818            # Flush the OpenSSL error queue
2819            try:
2820                _exception_from_error_queue(exceptionType)
2821            except exceptionType:
2822                pass
2823
2824            raise self._problems.pop(0)
2825
2826    def _read_passphrase(self, buf, size, rwflag, userdata):
2827        try:
2828            if callable(self._passphrase):
2829                if self._more_args:
2830                    result = self._passphrase(size, rwflag, userdata)
2831                else:
2832                    result = self._passphrase(rwflag)
2833            else:
2834                result = self._passphrase
2835            if not isinstance(result, bytes):
2836                raise ValueError("Bytes expected")
2837            if len(result) > size:
2838                if self._truncate:
2839                    result = result[:size]
2840                else:
2841                    raise ValueError(
2842                        "passphrase returned by callback is too long"
2843                    )
2844            for i in range(len(result)):
2845                buf[i] = result[i : i + 1]
2846            return len(result)
2847        except Exception as e:
2848            self._problems.append(e)
2849            return 0
2850
2851
2852def load_publickey(type, buffer):
2853    """
2854    Load a public key from a buffer.
2855
2856    :param type: The file type (one of :data:`FILETYPE_PEM`,
2857        :data:`FILETYPE_ASN1`).
2858    :param buffer: The buffer the key is stored in.
2859    :type buffer: A Python string object, either unicode or bytestring.
2860    :return: The PKey object.
2861    :rtype: :class:`PKey`
2862    """
2863    if isinstance(buffer, _text_type):
2864        buffer = buffer.encode("ascii")
2865
2866    bio = _new_mem_buf(buffer)
2867
2868    if type == FILETYPE_PEM:
2869        evp_pkey = _lib.PEM_read_bio_PUBKEY(
2870            bio, _ffi.NULL, _ffi.NULL, _ffi.NULL
2871        )
2872    elif type == FILETYPE_ASN1:
2873        evp_pkey = _lib.d2i_PUBKEY_bio(bio, _ffi.NULL)
2874    else:
2875        raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2876
2877    if evp_pkey == _ffi.NULL:
2878        _raise_current_error()
2879
2880    pkey = PKey.__new__(PKey)
2881    pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free)
2882    pkey._only_public = True
2883    return pkey
2884
2885
2886def load_privatekey(type, buffer, passphrase=None):
2887    """
2888    Load a private key (PKey) from the string *buffer* encoded with the type
2889    *type*.
2890
2891    :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
2892    :param buffer: The buffer the key is stored in
2893    :param passphrase: (optional) if encrypted PEM format, this can be
2894                       either the passphrase to use, or a callback for
2895                       providing the passphrase.
2896
2897    :return: The PKey object
2898    """
2899    if isinstance(buffer, _text_type):
2900        buffer = buffer.encode("ascii")
2901
2902    bio = _new_mem_buf(buffer)
2903
2904    helper = _PassphraseHelper(type, passphrase)
2905    if type == FILETYPE_PEM:
2906        evp_pkey = _lib.PEM_read_bio_PrivateKey(
2907            bio, _ffi.NULL, helper.callback, helper.callback_args
2908        )
2909        helper.raise_if_problem()
2910    elif type == FILETYPE_ASN1:
2911        evp_pkey = _lib.d2i_PrivateKey_bio(bio, _ffi.NULL)
2912    else:
2913        raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2914
2915    if evp_pkey == _ffi.NULL:
2916        _raise_current_error()
2917
2918    pkey = PKey.__new__(PKey)
2919    pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free)
2920    return pkey
2921
2922
2923def dump_certificate_request(type, req):
2924    """
2925    Dump the certificate request *req* into a buffer string encoded with the
2926    type *type*.
2927
2928    :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
2929    :param req: The certificate request to dump
2930    :return: The buffer with the dumped certificate request in
2931    """
2932    bio = _new_mem_buf()
2933
2934    if type == FILETYPE_PEM:
2935        result_code = _lib.PEM_write_bio_X509_REQ(bio, req._req)
2936    elif type == FILETYPE_ASN1:
2937        result_code = _lib.i2d_X509_REQ_bio(bio, req._req)
2938    elif type == FILETYPE_TEXT:
2939        result_code = _lib.X509_REQ_print_ex(bio, req._req, 0, 0)
2940    else:
2941        raise ValueError(
2942            "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
2943            "FILETYPE_TEXT"
2944        )
2945
2946    _openssl_assert(result_code != 0)
2947
2948    return _bio_to_string(bio)
2949
2950
2951def load_certificate_request(type, buffer):
2952    """
2953    Load a certificate request (X509Req) from the string *buffer* encoded with
2954    the type *type*.
2955
2956    :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
2957    :param buffer: The buffer the certificate request is stored in
2958    :return: The X509Req object
2959    """
2960    if isinstance(buffer, _text_type):
2961        buffer = buffer.encode("ascii")
2962
2963    bio = _new_mem_buf(buffer)
2964
2965    if type == FILETYPE_PEM:
2966        req = _lib.PEM_read_bio_X509_REQ(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
2967    elif type == FILETYPE_ASN1:
2968        req = _lib.d2i_X509_REQ_bio(bio, _ffi.NULL)
2969    else:
2970        raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2971
2972    _openssl_assert(req != _ffi.NULL)
2973
2974    x509req = X509Req.__new__(X509Req)
2975    x509req._req = _ffi.gc(req, _lib.X509_REQ_free)
2976    return x509req
2977
2978
2979def sign(pkey, data, digest):
2980    """
2981    Sign a data string using the given key and message digest.
2982
2983    :param pkey: PKey to sign with
2984    :param data: data to be signed
2985    :param digest: message digest to use
2986    :return: signature
2987
2988    .. versionadded:: 0.11
2989    """
2990    data = _text_to_bytes_and_warn("data", data)
2991
2992    digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
2993    if digest_obj == _ffi.NULL:
2994        raise ValueError("No such digest method")
2995
2996    md_ctx = _lib.Cryptography_EVP_MD_CTX_new()
2997    md_ctx = _ffi.gc(md_ctx, _lib.Cryptography_EVP_MD_CTX_free)
2998
2999    _lib.EVP_SignInit(md_ctx, digest_obj)
3000    _lib.EVP_SignUpdate(md_ctx, data, len(data))
3001
3002    length = _lib.EVP_PKEY_size(pkey._pkey)
3003    _openssl_assert(length > 0)
3004    signature_buffer = _ffi.new("unsigned char[]", length)
3005    signature_length = _ffi.new("unsigned int *")
3006    final_result = _lib.EVP_SignFinal(
3007        md_ctx, signature_buffer, signature_length, pkey._pkey
3008    )
3009    _openssl_assert(final_result == 1)
3010
3011    return _ffi.buffer(signature_buffer, signature_length[0])[:]
3012
3013
3014def verify(cert, signature, data, digest):
3015    """
3016    Verify the signature for a data string.
3017
3018    :param cert: signing certificate (X509 object) corresponding to the
3019        private key which generated the signature.
3020    :param signature: signature returned by sign function
3021    :param data: data to be verified
3022    :param digest: message digest to use
3023    :return: ``None`` if the signature is correct, raise exception otherwise.
3024
3025    .. versionadded:: 0.11
3026    """
3027    data = _text_to_bytes_and_warn("data", data)
3028
3029    digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
3030    if digest_obj == _ffi.NULL:
3031        raise ValueError("No such digest method")
3032
3033    pkey = _lib.X509_get_pubkey(cert._x509)
3034    _openssl_assert(pkey != _ffi.NULL)
3035    pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free)
3036
3037    md_ctx = _lib.Cryptography_EVP_MD_CTX_new()
3038    md_ctx = _ffi.gc(md_ctx, _lib.Cryptography_EVP_MD_CTX_free)
3039
3040    _lib.EVP_VerifyInit(md_ctx, digest_obj)
3041    _lib.EVP_VerifyUpdate(md_ctx, data, len(data))
3042    verify_result = _lib.EVP_VerifyFinal(
3043        md_ctx, signature, len(signature), pkey
3044    )
3045
3046    if verify_result != 1:
3047        _raise_current_error()
3048
3049
3050def dump_crl(type, crl):
3051    """
3052    Dump a certificate revocation list to a buffer.
3053
3054    :param type: The file type (one of ``FILETYPE_PEM``, ``FILETYPE_ASN1``, or
3055        ``FILETYPE_TEXT``).
3056    :param CRL crl: The CRL to dump.
3057
3058    :return: The buffer with the CRL.
3059    :rtype: bytes
3060    """
3061    bio = _new_mem_buf()
3062
3063    if type == FILETYPE_PEM:
3064        ret = _lib.PEM_write_bio_X509_CRL(bio, crl._crl)
3065    elif type == FILETYPE_ASN1:
3066        ret = _lib.i2d_X509_CRL_bio(bio, crl._crl)
3067    elif type == FILETYPE_TEXT:
3068        ret = _lib.X509_CRL_print(bio, crl._crl)
3069    else:
3070        raise ValueError(
3071            "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
3072            "FILETYPE_TEXT"
3073        )
3074
3075    _openssl_assert(ret == 1)
3076    return _bio_to_string(bio)
3077
3078
3079def load_crl(type, buffer):
3080    """
3081    Load Certificate Revocation List (CRL) data from a string *buffer*.
3082    *buffer* encoded with the type *type*.
3083
3084    :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
3085    :param buffer: The buffer the CRL is stored in
3086
3087    :return: The PKey object
3088    """
3089    if isinstance(buffer, _text_type):
3090        buffer = buffer.encode("ascii")
3091
3092    bio = _new_mem_buf(buffer)
3093
3094    if type == FILETYPE_PEM:
3095        crl = _lib.PEM_read_bio_X509_CRL(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
3096    elif type == FILETYPE_ASN1:
3097        crl = _lib.d2i_X509_CRL_bio(bio, _ffi.NULL)
3098    else:
3099        raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
3100
3101    if crl == _ffi.NULL:
3102        _raise_current_error()
3103
3104    result = CRL.__new__(CRL)
3105    result._crl = _ffi.gc(crl, _lib.X509_CRL_free)
3106    return result
3107
3108
3109def load_pkcs7_data(type, buffer):
3110    """
3111    Load pkcs7 data from the string *buffer* encoded with the type
3112    *type*.
3113
3114    :param type: The file type (one of FILETYPE_PEM or FILETYPE_ASN1)
3115    :param buffer: The buffer with the pkcs7 data.
3116    :return: The PKCS7 object
3117    """
3118    if isinstance(buffer, _text_type):
3119        buffer = buffer.encode("ascii")
3120
3121    bio = _new_mem_buf(buffer)
3122
3123    if type == FILETYPE_PEM:
3124        pkcs7 = _lib.PEM_read_bio_PKCS7(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
3125    elif type == FILETYPE_ASN1:
3126        pkcs7 = _lib.d2i_PKCS7_bio(bio, _ffi.NULL)
3127    else:
3128        raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
3129
3130    if pkcs7 == _ffi.NULL:
3131        _raise_current_error()
3132
3133    pypkcs7 = PKCS7.__new__(PKCS7)
3134    pypkcs7._pkcs7 = _ffi.gc(pkcs7, _lib.PKCS7_free)
3135    return pypkcs7
3136
3137
3138load_pkcs7_data = utils.deprecated(
3139    load_pkcs7_data,
3140    __name__,
3141    (
3142        "PKCS#7 support in pyOpenSSL is deprecated. You should use the APIs "
3143        "in cryptography."
3144    ),
3145    DeprecationWarning,
3146)
3147
3148
3149def load_pkcs12(buffer, passphrase=None):
3150    """
3151    Load pkcs12 data from the string *buffer*. If the pkcs12 structure is
3152    encrypted, a *passphrase* must be included.  The MAC is always
3153    checked and thus required.
3154
3155    See also the man page for the C function :py:func:`PKCS12_parse`.
3156
3157    :param buffer: The buffer the certificate is stored in
3158    :param passphrase: (Optional) The password to decrypt the PKCS12 lump
3159    :returns: The PKCS12 object
3160    """
3161    passphrase = _text_to_bytes_and_warn("passphrase", passphrase)
3162
3163    if isinstance(buffer, _text_type):
3164        buffer = buffer.encode("ascii")
3165
3166    bio = _new_mem_buf(buffer)
3167
3168    # Use null passphrase if passphrase is None or empty string. With PKCS#12
3169    # password based encryption no password and a zero length password are two
3170    # different things, but OpenSSL implementation will try both to figure out
3171    # which one works.
3172    if not passphrase:
3173        passphrase = _ffi.NULL
3174
3175    p12 = _lib.d2i_PKCS12_bio(bio, _ffi.NULL)
3176    if p12 == _ffi.NULL:
3177        _raise_current_error()
3178    p12 = _ffi.gc(p12, _lib.PKCS12_free)
3179
3180    pkey = _ffi.new("EVP_PKEY**")
3181    cert = _ffi.new("X509**")
3182    cacerts = _ffi.new("Cryptography_STACK_OF_X509**")
3183
3184    parse_result = _lib.PKCS12_parse(p12, passphrase, pkey, cert, cacerts)
3185    if not parse_result:
3186        _raise_current_error()
3187
3188    cacerts = _ffi.gc(cacerts[0], _lib.sk_X509_free)
3189
3190    # openssl 1.0.0 sometimes leaves an X509_check_private_key error in the
3191    # queue for no particular reason.  This error isn't interesting to anyone
3192    # outside this function.  It's not even interesting to us.  Get rid of it.
3193    try:
3194        _raise_current_error()
3195    except Error:
3196        pass
3197
3198    if pkey[0] == _ffi.NULL:
3199        pykey = None
3200    else:
3201        pykey = PKey.__new__(PKey)
3202        pykey._pkey = _ffi.gc(pkey[0], _lib.EVP_PKEY_free)
3203
3204    if cert[0] == _ffi.NULL:
3205        pycert = None
3206        friendlyname = None
3207    else:
3208        pycert = X509._from_raw_x509_ptr(cert[0])
3209
3210        friendlyname_length = _ffi.new("int*")
3211        friendlyname_buffer = _lib.X509_alias_get0(
3212            cert[0], friendlyname_length
3213        )
3214        friendlyname = _ffi.buffer(
3215            friendlyname_buffer, friendlyname_length[0]
3216        )[:]
3217        if friendlyname_buffer == _ffi.NULL:
3218            friendlyname = None
3219
3220    pycacerts = []
3221    for i in range(_lib.sk_X509_num(cacerts)):
3222        x509 = _lib.sk_X509_value(cacerts, i)
3223        pycacert = X509._from_raw_x509_ptr(x509)
3224        pycacerts.append(pycacert)
3225    if not pycacerts:
3226        pycacerts = None
3227
3228    pkcs12 = PKCS12.__new__(PKCS12)
3229    pkcs12._pkey = pykey
3230    pkcs12._cert = pycert
3231    pkcs12._cacerts = pycacerts
3232    pkcs12._friendlyname = friendlyname
3233    return pkcs12
3234
3235
3236load_pkcs12 = utils.deprecated(
3237    load_pkcs12,
3238    __name__,
3239    (
3240        "PKCS#12 support in pyOpenSSL is deprecated. You should use the APIs "
3241        "in cryptography."
3242    ),
3243    DeprecationWarning,
3244)
3245
3246
3247# There are no direct unit tests for this initialization.  It is tested
3248# indirectly since it is necessary for functions like dump_privatekey when
3249# using encryption.
3250#
3251# Thus OpenSSL.test.test_crypto.FunctionTests.test_dump_privatekey_passphrase
3252# and some other similar tests may fail without this (though they may not if
3253# the Python runtime has already done some initialization of the underlying
3254# OpenSSL library (and is linked against the same one that cryptography is
3255# using)).
3256_lib.OpenSSL_add_all_algorithms()
3257
3258# This is similar but exercised mainly by exception_from_error_queue.  It calls
3259# both ERR_load_crypto_strings() and ERR_load_SSL_strings().
3260_lib.SSL_load_error_strings()
3261
3262
3263# Set the default string mask to match OpenSSL upstream (since 2005) and
3264# RFC5280 recommendations.
3265_lib.ASN1_STRING_set_default_mask_asc(b"utf8only")
3266