• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import datetime
2
3from base64 import b16encode
4from functools import partial
5from operator import __eq__, __ne__, __lt__, __le__, __gt__, __ge__
6
7from six import (
8    integer_types as _integer_types,
9    text_type as _text_type,
10    PY3 as _PY3)
11
12from cryptography import x509
13from cryptography.hazmat.primitives.asymmetric import dsa, rsa
14from cryptography.utils import deprecated
15
16from OpenSSL._util import (
17    ffi as _ffi,
18    lib as _lib,
19    exception_from_error_queue as _exception_from_error_queue,
20    byte_string as _byte_string,
21    native as _native,
22    UNSPECIFIED as _UNSPECIFIED,
23    text_to_bytes_and_warn as _text_to_bytes_and_warn,
24    make_assert as _make_assert,
25)
26
27__all__ = [
28    'FILETYPE_PEM',
29    'FILETYPE_ASN1',
30    'FILETYPE_TEXT',
31    'TYPE_RSA',
32    'TYPE_DSA',
33    'Error',
34    'PKey',
35    'get_elliptic_curves',
36    'get_elliptic_curve',
37    'X509Name',
38    'X509Extension',
39    'X509Req',
40    'X509',
41    'X509StoreFlags',
42    'X509Store',
43    'X509StoreContextError',
44    'X509StoreContext',
45    'load_certificate',
46    'dump_certificate',
47    'dump_publickey',
48    'dump_privatekey',
49    'Revoked',
50    'CRL',
51    'PKCS7',
52    'PKCS12',
53    'NetscapeSPKI',
54    'load_publickey',
55    'load_privatekey',
56    'dump_certificate_request',
57    'load_certificate_request',
58    'sign',
59    'verify',
60    'dump_crl',
61    'load_crl',
62    'load_pkcs7_data',
63    'load_pkcs12'
64]
65
66FILETYPE_PEM = _lib.SSL_FILETYPE_PEM
67FILETYPE_ASN1 = _lib.SSL_FILETYPE_ASN1
68
69# TODO This was an API mistake.  OpenSSL has no such constant.
70FILETYPE_TEXT = 2 ** 16 - 1
71
72TYPE_RSA = _lib.EVP_PKEY_RSA
73TYPE_DSA = _lib.EVP_PKEY_DSA
74TYPE_DH = _lib.EVP_PKEY_DH
75TYPE_EC = _lib.EVP_PKEY_EC
76
77
78class Error(Exception):
79    """
80    An error occurred in an `OpenSSL.crypto` API.
81    """
82
83
84_raise_current_error = partial(_exception_from_error_queue, Error)
85_openssl_assert = _make_assert(Error)
86
87
88def _get_backend():
89    """
90    Importing the backend from cryptography has the side effect of activating
91    the osrandom engine. This mutates the global state of OpenSSL in the
92    process and causes issues for various programs that use subinterpreters or
93    embed Python. By putting the import in this function we can avoid
94    triggering this side effect unless _get_backend is called.
95    """
96    from cryptography.hazmat.backends.openssl.backend import backend
97    return backend
98
99
100def _untested_error(where):
101    """
102    An OpenSSL API failed somehow.  Additionally, the failure which was
103    encountered isn't one that's exercised by the test suite so future behavior
104    of pyOpenSSL is now somewhat less predictable.
105    """
106    raise RuntimeError("Unknown %s failure" % (where,))
107
108
109def _new_mem_buf(buffer=None):
110    """
111    Allocate a new OpenSSL memory BIO.
112
113    Arrange for the garbage collector to clean it up automatically.
114
115    :param buffer: None or some bytes to use to put into the BIO so that they
116        can be read out.
117    """
118    if buffer is None:
119        bio = _lib.BIO_new(_lib.BIO_s_mem())
120        free = _lib.BIO_free
121    else:
122        data = _ffi.new("char[]", buffer)
123        bio = _lib.BIO_new_mem_buf(data, len(buffer))
124
125        # Keep the memory alive as long as the bio is alive!
126        def free(bio, ref=data):
127            return _lib.BIO_free(bio)
128
129    _openssl_assert(bio != _ffi.NULL)
130
131    bio = _ffi.gc(bio, free)
132    return bio
133
134
135def _bio_to_string(bio):
136    """
137    Copy the contents of an OpenSSL BIO object into a Python byte string.
138    """
139    result_buffer = _ffi.new('char**')
140    buffer_length = _lib.BIO_get_mem_data(bio, result_buffer)
141    return _ffi.buffer(result_buffer[0], buffer_length)[:]
142
143
144def _set_asn1_time(boundary, when):
145    """
146    The the time value of an ASN1 time object.
147
148    @param boundary: An ASN1_TIME pointer (or an object safely
149        castable to that type) which will have its value set.
150    @param when: A string representation of the desired time value.
151
152    @raise TypeError: If C{when} is not a L{bytes} string.
153    @raise ValueError: If C{when} does not represent a time in the required
154        format.
155    @raise RuntimeError: If the time value cannot be set for some other
156        (unspecified) reason.
157    """
158    if not isinstance(when, bytes):
159        raise TypeError("when must be a byte string")
160
161    set_result = _lib.ASN1_TIME_set_string(boundary, when)
162    if set_result == 0:
163        raise ValueError("Invalid string")
164
165
166def _get_asn1_time(timestamp):
167    """
168    Retrieve the time value of an ASN1 time object.
169
170    @param timestamp: An ASN1_GENERALIZEDTIME* (or an object safely castable to
171        that type) from which the time value will be retrieved.
172
173    @return: The time value from C{timestamp} as a L{bytes} string in a certain
174        format.  Or C{None} if the object contains no time value.
175    """
176    string_timestamp = _ffi.cast('ASN1_STRING*', timestamp)
177    if _lib.ASN1_STRING_length(string_timestamp) == 0:
178        return None
179    elif (
180        _lib.ASN1_STRING_type(string_timestamp) == _lib.V_ASN1_GENERALIZEDTIME
181    ):
182        return _ffi.string(_lib.ASN1_STRING_data(string_timestamp))
183    else:
184        generalized_timestamp = _ffi.new("ASN1_GENERALIZEDTIME**")
185        _lib.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp)
186        if generalized_timestamp[0] == _ffi.NULL:
187            # This may happen:
188            #   - if timestamp was not an ASN1_TIME
189            #   - if allocating memory for the ASN1_GENERALIZEDTIME failed
190            #   - if a copy of the time data from timestamp cannot be made for
191            #     the newly allocated ASN1_GENERALIZEDTIME
192            #
193            # These are difficult to test.  cffi enforces the ASN1_TIME type.
194            # Memory allocation failures are a pain to trigger
195            # deterministically.
196            _untested_error("ASN1_TIME_to_generalizedtime")
197        else:
198            string_timestamp = _ffi.cast(
199                "ASN1_STRING*", generalized_timestamp[0])
200            string_data = _lib.ASN1_STRING_data(string_timestamp)
201            string_result = _ffi.string(string_data)
202            _lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0])
203            return string_result
204
205
206class _X509NameInvalidator(object):
207    def __init__(self):
208        self._names = []
209
210    def add(self, name):
211        self._names.append(name)
212
213    def clear(self):
214        for name in self._names:
215            # Breaks the object, but also prevents UAF!
216            del name._name
217
218
219class PKey(object):
220    """
221    A class representing an DSA or RSA public key or key pair.
222    """
223    _only_public = False
224    _initialized = True
225
226    def __init__(self):
227        pkey = _lib.EVP_PKEY_new()
228        self._pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free)
229        self._initialized = False
230
231    def to_cryptography_key(self):
232        """
233        Export as a ``cryptography`` key.
234
235        :rtype: One of ``cryptography``'s `key interfaces`_.
236
237        .. _key interfaces: https://cryptography.io/en/latest/hazmat/\
238            primitives/asymmetric/rsa/#key-interfaces
239
240        .. versionadded:: 16.1.0
241        """
242        backend = _get_backend()
243        if self._only_public:
244            return backend._evp_pkey_to_public_key(self._pkey)
245        else:
246            return backend._evp_pkey_to_private_key(self._pkey)
247
248    @classmethod
249    def from_cryptography_key(cls, crypto_key):
250        """
251        Construct based on a ``cryptography`` *crypto_key*.
252
253        :param crypto_key: A ``cryptography`` key.
254        :type crypto_key: One of ``cryptography``'s `key interfaces`_.
255
256        :rtype: PKey
257
258        .. versionadded:: 16.1.0
259        """
260        pkey = cls()
261        if not isinstance(crypto_key, (rsa.RSAPublicKey, rsa.RSAPrivateKey,
262                                       dsa.DSAPublicKey, dsa.DSAPrivateKey)):
263            raise TypeError("Unsupported key type")
264
265        pkey._pkey = crypto_key._evp_pkey
266        if isinstance(crypto_key, (rsa.RSAPublicKey, dsa.DSAPublicKey)):
267            pkey._only_public = True
268        pkey._initialized = True
269        return pkey
270
271    def generate_key(self, type, bits):
272        """
273        Generate a key pair of the given type, with the given number of bits.
274
275        This generates a key "into" the this object.
276
277        :param type: The key type.
278        :type type: :py:data:`TYPE_RSA` or :py:data:`TYPE_DSA`
279        :param bits: The number of bits.
280        :type bits: :py:data:`int` ``>= 0``
281        :raises TypeError: If :py:data:`type` or :py:data:`bits` isn't
282            of the appropriate type.
283        :raises ValueError: If the number of bits isn't an integer of
284            the appropriate size.
285        :return: ``None``
286        """
287        if not isinstance(type, int):
288            raise TypeError("type must be an integer")
289
290        if not isinstance(bits, int):
291            raise TypeError("bits must be an integer")
292
293        if type == TYPE_RSA:
294            if bits <= 0:
295                raise ValueError("Invalid number of bits")
296
297            # TODO Check error return
298            exponent = _lib.BN_new()
299            exponent = _ffi.gc(exponent, _lib.BN_free)
300            _lib.BN_set_word(exponent, _lib.RSA_F4)
301
302            rsa = _lib.RSA_new()
303
304            result = _lib.RSA_generate_key_ex(rsa, bits, exponent, _ffi.NULL)
305            _openssl_assert(result == 1)
306
307            result = _lib.EVP_PKEY_assign_RSA(self._pkey, rsa)
308            _openssl_assert(result == 1)
309
310        elif type == TYPE_DSA:
311            dsa = _lib.DSA_new()
312            _openssl_assert(dsa != _ffi.NULL)
313
314            dsa = _ffi.gc(dsa, _lib.DSA_free)
315            res = _lib.DSA_generate_parameters_ex(
316                dsa, bits, _ffi.NULL, 0, _ffi.NULL, _ffi.NULL, _ffi.NULL
317            )
318            _openssl_assert(res == 1)
319
320            _openssl_assert(_lib.DSA_generate_key(dsa) == 1)
321            _openssl_assert(_lib.EVP_PKEY_set1_DSA(self._pkey, dsa) == 1)
322        else:
323            raise Error("No such key type")
324
325        self._initialized = True
326
327    def check(self):
328        """
329        Check the consistency of an RSA private key.
330
331        This is the Python equivalent of OpenSSL's ``RSA_check_key``.
332
333        :return: ``True`` if key is consistent.
334
335        :raise OpenSSL.crypto.Error: if the key is inconsistent.
336
337        :raise TypeError: if the key is of a type which cannot be checked.
338            Only RSA keys can currently be checked.
339        """
340        if self._only_public:
341            raise TypeError("public key only")
342
343        if _lib.EVP_PKEY_type(self.type()) != _lib.EVP_PKEY_RSA:
344            raise TypeError("key type unsupported")
345
346        rsa = _lib.EVP_PKEY_get1_RSA(self._pkey)
347        rsa = _ffi.gc(rsa, _lib.RSA_free)
348        result = _lib.RSA_check_key(rsa)
349        if result:
350            return True
351        _raise_current_error()
352
353    def type(self):
354        """
355        Returns the type of the key
356
357        :return: The type of the key.
358        """
359        return _lib.EVP_PKEY_id(self._pkey)
360
361    def bits(self):
362        """
363        Returns the number of bits of the key
364
365        :return: The number of bits of the key.
366        """
367        return _lib.EVP_PKEY_bits(self._pkey)
368
369
370PKeyType = deprecated(
371    PKey, __name__,
372    "PKeyType has been deprecated, use PKey instead",
373    DeprecationWarning
374)
375
376
377class _EllipticCurve(object):
378    """
379    A representation of a supported elliptic curve.
380
381    @cvar _curves: :py:obj:`None` until an attempt is made to load the curves.
382        Thereafter, a :py:type:`set` containing :py:type:`_EllipticCurve`
383        instances each of which represents one curve supported by the system.
384    @type _curves: :py:type:`NoneType` or :py:type:`set`
385    """
386    _curves = None
387
388    if _PY3:
389        # This only necessary on Python 3.  Morever, it is broken on Python 2.
390        def __ne__(self, other):
391            """
392            Implement cooperation with the right-hand side argument of ``!=``.
393
394            Python 3 seems to have dropped this cooperation in this very narrow
395            circumstance.
396            """
397            if isinstance(other, _EllipticCurve):
398                return super(_EllipticCurve, self).__ne__(other)
399            return NotImplemented
400
401    @classmethod
402    def _load_elliptic_curves(cls, lib):
403        """
404        Get the curves supported by OpenSSL.
405
406        :param lib: The OpenSSL library binding object.
407
408        :return: A :py:type:`set` of ``cls`` instances giving the names of the
409            elliptic curves the underlying library supports.
410        """
411        num_curves = lib.EC_get_builtin_curves(_ffi.NULL, 0)
412        builtin_curves = _ffi.new('EC_builtin_curve[]', num_curves)
413        # The return value on this call should be num_curves again.  We
414        # could check it to make sure but if it *isn't* then.. what could
415        # we do? Abort the whole process, I suppose...?  -exarkun
416        lib.EC_get_builtin_curves(builtin_curves, num_curves)
417        return set(
418            cls.from_nid(lib, c.nid)
419            for c in builtin_curves)
420
421    @classmethod
422    def _get_elliptic_curves(cls, lib):
423        """
424        Get, cache, and return the curves supported by OpenSSL.
425
426        :param lib: The OpenSSL library binding object.
427
428        :return: A :py:type:`set` of ``cls`` instances giving the names of the
429            elliptic curves the underlying library supports.
430        """
431        if cls._curves is None:
432            cls._curves = cls._load_elliptic_curves(lib)
433        return cls._curves
434
435    @classmethod
436    def from_nid(cls, lib, nid):
437        """
438        Instantiate a new :py:class:`_EllipticCurve` associated with the given
439        OpenSSL NID.
440
441        :param lib: The OpenSSL library binding object.
442
443        :param nid: The OpenSSL NID the resulting curve object will represent.
444            This must be a curve NID (and not, for example, a hash NID) or
445            subsequent operations will fail in unpredictable ways.
446        :type nid: :py:class:`int`
447
448        :return: The curve object.
449        """
450        return cls(lib, nid, _ffi.string(lib.OBJ_nid2sn(nid)).decode("ascii"))
451
452    def __init__(self, lib, nid, name):
453        """
454        :param _lib: The :py:mod:`cryptography` binding instance used to
455            interface with OpenSSL.
456
457        :param _nid: The OpenSSL NID identifying the curve this object
458            represents.
459        :type _nid: :py:class:`int`
460
461        :param name: The OpenSSL short name identifying the curve this object
462            represents.
463        :type name: :py:class:`unicode`
464        """
465        self._lib = lib
466        self._nid = nid
467        self.name = name
468
469    def __repr__(self):
470        return "<Curve %r>" % (self.name,)
471
472    def _to_EC_KEY(self):
473        """
474        Create a new OpenSSL EC_KEY structure initialized to use this curve.
475
476        The structure is automatically garbage collected when the Python object
477        is garbage collected.
478        """
479        key = self._lib.EC_KEY_new_by_curve_name(self._nid)
480        return _ffi.gc(key, _lib.EC_KEY_free)
481
482
483def get_elliptic_curves():
484    """
485    Return a set of objects representing the elliptic curves supported in the
486    OpenSSL build in use.
487
488    The curve objects have a :py:class:`unicode` ``name`` attribute by which
489    they identify themselves.
490
491    The curve objects are useful as values for the argument accepted by
492    :py:meth:`Context.set_tmp_ecdh` to specify which elliptical curve should be
493    used for ECDHE key exchange.
494    """
495    return _EllipticCurve._get_elliptic_curves(_lib)
496
497
498def get_elliptic_curve(name):
499    """
500    Return a single curve object selected by name.
501
502    See :py:func:`get_elliptic_curves` for information about curve objects.
503
504    :param name: The OpenSSL short name identifying the curve object to
505        retrieve.
506    :type name: :py:class:`unicode`
507
508    If the named curve is not supported then :py:class:`ValueError` is raised.
509    """
510    for curve in get_elliptic_curves():
511        if curve.name == name:
512            return curve
513    raise ValueError("unknown curve name", name)
514
515
516class X509Name(object):
517    """
518    An X.509 Distinguished Name.
519
520    :ivar countryName: The country of the entity.
521    :ivar C: Alias for  :py:attr:`countryName`.
522
523    :ivar stateOrProvinceName: The state or province of the entity.
524    :ivar ST: Alias for :py:attr:`stateOrProvinceName`.
525
526    :ivar localityName: The locality of the entity.
527    :ivar L: Alias for :py:attr:`localityName`.
528
529    :ivar organizationName: The organization name of the entity.
530    :ivar O: Alias for :py:attr:`organizationName`.
531
532    :ivar organizationalUnitName: The organizational unit of the entity.
533    :ivar OU: Alias for :py:attr:`organizationalUnitName`
534
535    :ivar commonName: The common name of the entity.
536    :ivar CN: Alias for :py:attr:`commonName`.
537
538    :ivar emailAddress: The e-mail address of the entity.
539    """
540
541    def __init__(self, name):
542        """
543        Create a new X509Name, copying the given X509Name instance.
544
545        :param name: The name to copy.
546        :type name: :py:class:`X509Name`
547        """
548        name = _lib.X509_NAME_dup(name._name)
549        self._name = _ffi.gc(name, _lib.X509_NAME_free)
550
551    def __setattr__(self, name, value):
552        if name.startswith('_'):
553            return super(X509Name, self).__setattr__(name, value)
554
555        # Note: we really do not want str subclasses here, so we do not use
556        # isinstance.
557        if type(name) is not str:
558            raise TypeError("attribute name must be string, not '%.200s'" % (
559                type(value).__name__,))
560
561        nid = _lib.OBJ_txt2nid(_byte_string(name))
562        if nid == _lib.NID_undef:
563            try:
564                _raise_current_error()
565            except Error:
566                pass
567            raise AttributeError("No such attribute")
568
569        # If there's an old entry for this NID, remove it
570        for i in range(_lib.X509_NAME_entry_count(self._name)):
571            ent = _lib.X509_NAME_get_entry(self._name, i)
572            ent_obj = _lib.X509_NAME_ENTRY_get_object(ent)
573            ent_nid = _lib.OBJ_obj2nid(ent_obj)
574            if nid == ent_nid:
575                ent = _lib.X509_NAME_delete_entry(self._name, i)
576                _lib.X509_NAME_ENTRY_free(ent)
577                break
578
579        if isinstance(value, _text_type):
580            value = value.encode('utf-8')
581
582        add_result = _lib.X509_NAME_add_entry_by_NID(
583            self._name, nid, _lib.MBSTRING_UTF8, value, -1, -1, 0)
584        if not add_result:
585            _raise_current_error()
586
587    def __getattr__(self, name):
588        """
589        Find attribute. An X509Name object has the following attributes:
590        countryName (alias C), stateOrProvince (alias ST), locality (alias L),
591        organization (alias O), organizationalUnit (alias OU), commonName
592        (alias CN) and more...
593        """
594        nid = _lib.OBJ_txt2nid(_byte_string(name))
595        if nid == _lib.NID_undef:
596            # This is a bit weird.  OBJ_txt2nid indicated failure, but it seems
597            # a lower level function, a2d_ASN1_OBJECT, also feels the need to
598            # push something onto the error queue.  If we don't clean that up
599            # now, someone else will bump into it later and be quite confused.
600            # See lp#314814.
601            try:
602                _raise_current_error()
603            except Error:
604                pass
605            return super(X509Name, self).__getattr__(name)
606
607        entry_index = _lib.X509_NAME_get_index_by_NID(self._name, nid, -1)
608        if entry_index == -1:
609            return None
610
611        entry = _lib.X509_NAME_get_entry(self._name, entry_index)
612        data = _lib.X509_NAME_ENTRY_get_data(entry)
613
614        result_buffer = _ffi.new("unsigned char**")
615        data_length = _lib.ASN1_STRING_to_UTF8(result_buffer, data)
616        _openssl_assert(data_length >= 0)
617
618        try:
619            result = _ffi.buffer(
620                result_buffer[0], data_length
621            )[:].decode('utf-8')
622        finally:
623            # XXX untested
624            _lib.OPENSSL_free(result_buffer[0])
625        return result
626
627    def _cmp(op):
628        def f(self, other):
629            if not isinstance(other, X509Name):
630                return NotImplemented
631            result = _lib.X509_NAME_cmp(self._name, other._name)
632            return op(result, 0)
633        return f
634
635    __eq__ = _cmp(__eq__)
636    __ne__ = _cmp(__ne__)
637
638    __lt__ = _cmp(__lt__)
639    __le__ = _cmp(__le__)
640
641    __gt__ = _cmp(__gt__)
642    __ge__ = _cmp(__ge__)
643
644    def __repr__(self):
645        """
646        String representation of an X509Name
647        """
648        result_buffer = _ffi.new("char[]", 512)
649        format_result = _lib.X509_NAME_oneline(
650            self._name, result_buffer, len(result_buffer))
651        _openssl_assert(format_result != _ffi.NULL)
652
653        return "<X509Name object '%s'>" % (
654            _native(_ffi.string(result_buffer)),)
655
656    def hash(self):
657        """
658        Return an integer representation of the first four bytes of the
659        MD5 digest of the DER representation of the name.
660
661        This is the Python equivalent of OpenSSL's ``X509_NAME_hash``.
662
663        :return: The (integer) hash of this name.
664        :rtype: :py:class:`int`
665        """
666        return _lib.X509_NAME_hash(self._name)
667
668    def der(self):
669        """
670        Return the DER encoding of this name.
671
672        :return: The DER encoded form of this name.
673        :rtype: :py:class:`bytes`
674        """
675        result_buffer = _ffi.new('unsigned char**')
676        encode_result = _lib.i2d_X509_NAME(self._name, result_buffer)
677        _openssl_assert(encode_result >= 0)
678
679        string_result = _ffi.buffer(result_buffer[0], encode_result)[:]
680        _lib.OPENSSL_free(result_buffer[0])
681        return string_result
682
683    def get_components(self):
684        """
685        Returns the components of this name, as a sequence of 2-tuples.
686
687        :return: The components of this name.
688        :rtype: :py:class:`list` of ``name, value`` tuples.
689        """
690        result = []
691        for i in range(_lib.X509_NAME_entry_count(self._name)):
692            ent = _lib.X509_NAME_get_entry(self._name, i)
693
694            fname = _lib.X509_NAME_ENTRY_get_object(ent)
695            fval = _lib.X509_NAME_ENTRY_get_data(ent)
696
697            nid = _lib.OBJ_obj2nid(fname)
698            name = _lib.OBJ_nid2sn(nid)
699
700            # ffi.string does not handle strings containing NULL bytes
701            # (which may have been generated by old, broken software)
702            value = _ffi.buffer(_lib.ASN1_STRING_data(fval),
703                                _lib.ASN1_STRING_length(fval))[:]
704            result.append((_ffi.string(name), value))
705
706        return result
707
708
709X509NameType = deprecated(
710    X509Name, __name__,
711    "X509NameType has been deprecated, use X509Name instead",
712    DeprecationWarning
713)
714
715
716class X509Extension(object):
717    """
718    An X.509 v3 certificate extension.
719    """
720
721    def __init__(self, type_name, critical, value, subject=None, issuer=None):
722        """
723        Initializes an X509 extension.
724
725        :param type_name: The name of the type of extension_ to create.
726        :type type_name: :py:data:`bytes`
727
728        :param bool critical: A flag indicating whether this is a critical
729            extension.
730
731        :param value: The value of the extension.
732        :type value: :py:data:`bytes`
733
734        :param subject: Optional X509 certificate to use as subject.
735        :type subject: :py:class:`X509`
736
737        :param issuer: Optional X509 certificate to use as issuer.
738        :type issuer: :py:class:`X509`
739
740        .. _extension: https://www.openssl.org/docs/manmaster/man5/
741            x509v3_config.html#STANDARD-EXTENSIONS
742        """
743        ctx = _ffi.new("X509V3_CTX*")
744
745        # A context is necessary for any extension which uses the r2i
746        # conversion method.  That is, X509V3_EXT_nconf may segfault if passed
747        # a NULL ctx. Start off by initializing most of the fields to NULL.
748        _lib.X509V3_set_ctx(ctx, _ffi.NULL, _ffi.NULL, _ffi.NULL, _ffi.NULL, 0)
749
750        # We have no configuration database - but perhaps we should (some
751        # extensions may require it).
752        _lib.X509V3_set_ctx_nodb(ctx)
753
754        # Initialize the subject and issuer, if appropriate.  ctx is a local,
755        # and as far as I can tell none of the X509V3_* APIs invoked here steal
756        # any references, so no need to mess with reference counts or
757        # duplicates.
758        if issuer is not None:
759            if not isinstance(issuer, X509):
760                raise TypeError("issuer must be an X509 instance")
761            ctx.issuer_cert = issuer._x509
762        if subject is not None:
763            if not isinstance(subject, X509):
764                raise TypeError("subject must be an X509 instance")
765            ctx.subject_cert = subject._x509
766
767        if critical:
768            # There are other OpenSSL APIs which would let us pass in critical
769            # separately, but they're harder to use, and since value is already
770            # a pile of crappy junk smuggling a ton of utterly important
771            # structured data, what's the point of trying to avoid nasty stuff
772            # with strings? (However, X509V3_EXT_i2d in particular seems like
773            # it would be a better API to invoke.  I do not know where to get
774            # the ext_struc it desires for its last parameter, though.)
775            value = b"critical," + value
776
777        extension = _lib.X509V3_EXT_nconf(_ffi.NULL, ctx, type_name, value)
778        if extension == _ffi.NULL:
779            _raise_current_error()
780        self._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free)
781
782    @property
783    def _nid(self):
784        return _lib.OBJ_obj2nid(
785            _lib.X509_EXTENSION_get_object(self._extension)
786        )
787
788    _prefixes = {
789        _lib.GEN_EMAIL: "email",
790        _lib.GEN_DNS: "DNS",
791        _lib.GEN_URI: "URI",
792    }
793
794    def _subjectAltNameString(self):
795        names = _ffi.cast(
796            "GENERAL_NAMES*", _lib.X509V3_EXT_d2i(self._extension)
797        )
798
799        names = _ffi.gc(names, _lib.GENERAL_NAMES_free)
800        parts = []
801        for i in range(_lib.sk_GENERAL_NAME_num(names)):
802            name = _lib.sk_GENERAL_NAME_value(names, i)
803            try:
804                label = self._prefixes[name.type]
805            except KeyError:
806                bio = _new_mem_buf()
807                _lib.GENERAL_NAME_print(bio, name)
808                parts.append(_native(_bio_to_string(bio)))
809            else:
810                value = _native(
811                    _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:])
812                parts.append(label + ":" + value)
813        return ", ".join(parts)
814
815    def __str__(self):
816        """
817        :return: a nice text representation of the extension
818        """
819        if _lib.NID_subject_alt_name == self._nid:
820            return self._subjectAltNameString()
821
822        bio = _new_mem_buf()
823        print_result = _lib.X509V3_EXT_print(bio, self._extension, 0, 0)
824        _openssl_assert(print_result != 0)
825
826        return _native(_bio_to_string(bio))
827
828    def get_critical(self):
829        """
830        Returns the critical field of this X.509 extension.
831
832        :return: The critical field.
833        """
834        return _lib.X509_EXTENSION_get_critical(self._extension)
835
836    def get_short_name(self):
837        """
838        Returns the short type name of this X.509 extension.
839
840        The result is a byte string such as :py:const:`b"basicConstraints"`.
841
842        :return: The short type name.
843        :rtype: :py:data:`bytes`
844
845        .. versionadded:: 0.12
846        """
847        obj = _lib.X509_EXTENSION_get_object(self._extension)
848        nid = _lib.OBJ_obj2nid(obj)
849        return _ffi.string(_lib.OBJ_nid2sn(nid))
850
851    def get_data(self):
852        """
853        Returns the data of the X509 extension, encoded as ASN.1.
854
855        :return: The ASN.1 encoded data of this X509 extension.
856        :rtype: :py:data:`bytes`
857
858        .. versionadded:: 0.12
859        """
860        octet_result = _lib.X509_EXTENSION_get_data(self._extension)
861        string_result = _ffi.cast('ASN1_STRING*', octet_result)
862        char_result = _lib.ASN1_STRING_data(string_result)
863        result_length = _lib.ASN1_STRING_length(string_result)
864        return _ffi.buffer(char_result, result_length)[:]
865
866
867X509ExtensionType = deprecated(
868    X509Extension, __name__,
869    "X509ExtensionType has been deprecated, use X509Extension instead",
870    DeprecationWarning
871)
872
873
874class X509Req(object):
875    """
876    An X.509 certificate signing requests.
877    """
878
879    def __init__(self):
880        req = _lib.X509_REQ_new()
881        self._req = _ffi.gc(req, _lib.X509_REQ_free)
882        # Default to version 0.
883        self.set_version(0)
884
885    def to_cryptography(self):
886        """
887        Export as a ``cryptography`` certificate signing request.
888
889        :rtype: ``cryptography.x509.CertificateSigningRequest``
890
891        .. versionadded:: 17.1.0
892        """
893        from cryptography.hazmat.backends.openssl.x509 import (
894            _CertificateSigningRequest
895        )
896        backend = _get_backend()
897        return _CertificateSigningRequest(backend, self._req)
898
899    @classmethod
900    def from_cryptography(cls, crypto_req):
901        """
902        Construct based on a ``cryptography`` *crypto_req*.
903
904        :param crypto_req: A ``cryptography`` X.509 certificate signing request
905        :type crypto_req: ``cryptography.x509.CertificateSigningRequest``
906
907        :rtype: X509Req
908
909        .. versionadded:: 17.1.0
910        """
911        if not isinstance(crypto_req, x509.CertificateSigningRequest):
912            raise TypeError("Must be a certificate signing request")
913
914        req = cls()
915        req._req = crypto_req._x509_req
916        return req
917
918    def set_pubkey(self, pkey):
919        """
920        Set the public key of the certificate signing request.
921
922        :param pkey: The public key to use.
923        :type pkey: :py:class:`PKey`
924
925        :return: ``None``
926        """
927        set_result = _lib.X509_REQ_set_pubkey(self._req, pkey._pkey)
928        _openssl_assert(set_result == 1)
929
930    def get_pubkey(self):
931        """
932        Get the public key of the certificate signing request.
933
934        :return: The public key.
935        :rtype: :py:class:`PKey`
936        """
937        pkey = PKey.__new__(PKey)
938        pkey._pkey = _lib.X509_REQ_get_pubkey(self._req)
939        _openssl_assert(pkey._pkey != _ffi.NULL)
940        pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free)
941        pkey._only_public = True
942        return pkey
943
944    def set_version(self, version):
945        """
946        Set the version subfield (RFC 2459, section 4.1.2.1) of the certificate
947        request.
948
949        :param int version: The version number.
950        :return: ``None``
951        """
952        set_result = _lib.X509_REQ_set_version(self._req, version)
953        _openssl_assert(set_result == 1)
954
955    def get_version(self):
956        """
957        Get the version subfield (RFC 2459, section 4.1.2.1) of the certificate
958        request.
959
960        :return: The value of the version subfield.
961        :rtype: :py:class:`int`
962        """
963        return _lib.X509_REQ_get_version(self._req)
964
965    def get_subject(self):
966        """
967        Return the subject of this certificate signing request.
968
969        This creates a new :class:`X509Name` that wraps the underlying subject
970        name field on the certificate signing request. Modifying it will modify
971        the underlying signing request, and will have the effect of modifying
972        any other :class:`X509Name` that refers to this subject.
973
974        :return: The subject of this certificate signing request.
975        :rtype: :class:`X509Name`
976        """
977        name = X509Name.__new__(X509Name)
978        name._name = _lib.X509_REQ_get_subject_name(self._req)
979        _openssl_assert(name._name != _ffi.NULL)
980
981        # The name is owned by the X509Req structure.  As long as the X509Name
982        # Python object is alive, keep the X509Req Python object alive.
983        name._owner = self
984
985        return name
986
987    def add_extensions(self, extensions):
988        """
989        Add extensions to the certificate signing request.
990
991        :param extensions: The X.509 extensions to add.
992        :type extensions: iterable of :py:class:`X509Extension`
993        :return: ``None``
994        """
995        stack = _lib.sk_X509_EXTENSION_new_null()
996        _openssl_assert(stack != _ffi.NULL)
997
998        stack = _ffi.gc(stack, _lib.sk_X509_EXTENSION_free)
999
1000        for ext in extensions:
1001            if not isinstance(ext, X509Extension):
1002                raise ValueError("One of the elements is not an X509Extension")
1003
1004            # TODO push can fail (here and elsewhere)
1005            _lib.sk_X509_EXTENSION_push(stack, ext._extension)
1006
1007        add_result = _lib.X509_REQ_add_extensions(self._req, stack)
1008        _openssl_assert(add_result == 1)
1009
1010    def get_extensions(self):
1011        """
1012        Get X.509 extensions in the certificate signing request.
1013
1014        :return: The X.509 extensions in this request.
1015        :rtype: :py:class:`list` of :py:class:`X509Extension` objects.
1016
1017        .. versionadded:: 0.15
1018        """
1019        exts = []
1020        native_exts_obj = _lib.X509_REQ_get_extensions(self._req)
1021        for i in range(_lib.sk_X509_EXTENSION_num(native_exts_obj)):
1022            ext = X509Extension.__new__(X509Extension)
1023            ext._extension = _lib.sk_X509_EXTENSION_value(native_exts_obj, i)
1024            exts.append(ext)
1025        return exts
1026
1027    def sign(self, pkey, digest):
1028        """
1029        Sign the certificate signing request with this key and digest type.
1030
1031        :param pkey: The key pair to sign with.
1032        :type pkey: :py:class:`PKey`
1033        :param digest: The name of the message digest to use for the signature,
1034            e.g. :py:data:`b"sha256"`.
1035        :type digest: :py:class:`bytes`
1036        :return: ``None``
1037        """
1038        if pkey._only_public:
1039            raise ValueError("Key has only public part")
1040
1041        if not pkey._initialized:
1042            raise ValueError("Key is uninitialized")
1043
1044        digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
1045        if digest_obj == _ffi.NULL:
1046            raise ValueError("No such digest method")
1047
1048        sign_result = _lib.X509_REQ_sign(self._req, pkey._pkey, digest_obj)
1049        _openssl_assert(sign_result > 0)
1050
1051    def verify(self, pkey):
1052        """
1053        Verifies the signature on this certificate signing request.
1054
1055        :param PKey key: A public key.
1056
1057        :return: ``True`` if the signature is correct.
1058        :rtype: bool
1059
1060        :raises OpenSSL.crypto.Error: If the signature is invalid or there is a
1061            problem verifying the signature.
1062        """
1063        if not isinstance(pkey, PKey):
1064            raise TypeError("pkey must be a PKey instance")
1065
1066        result = _lib.X509_REQ_verify(self._req, pkey._pkey)
1067        if result <= 0:
1068            _raise_current_error()
1069
1070        return result
1071
1072
1073X509ReqType = deprecated(
1074    X509Req, __name__,
1075    "X509ReqType has been deprecated, use X509Req instead",
1076    DeprecationWarning
1077)
1078
1079
1080class X509(object):
1081    """
1082    An X.509 certificate.
1083    """
1084    def __init__(self):
1085        x509 = _lib.X509_new()
1086        _openssl_assert(x509 != _ffi.NULL)
1087        self._x509 = _ffi.gc(x509, _lib.X509_free)
1088
1089        self._issuer_invalidator = _X509NameInvalidator()
1090        self._subject_invalidator = _X509NameInvalidator()
1091
1092    @classmethod
1093    def _from_raw_x509_ptr(cls, x509):
1094        cert = cls.__new__(cls)
1095        cert._x509 = _ffi.gc(x509, _lib.X509_free)
1096        cert._issuer_invalidator = _X509NameInvalidator()
1097        cert._subject_invalidator = _X509NameInvalidator()
1098        return cert
1099
1100    def to_cryptography(self):
1101        """
1102        Export as a ``cryptography`` certificate.
1103
1104        :rtype: ``cryptography.x509.Certificate``
1105
1106        .. versionadded:: 17.1.0
1107        """
1108        from cryptography.hazmat.backends.openssl.x509 import _Certificate
1109        backend = _get_backend()
1110        return _Certificate(backend, self._x509)
1111
1112    @classmethod
1113    def from_cryptography(cls, crypto_cert):
1114        """
1115        Construct based on a ``cryptography`` *crypto_cert*.
1116
1117        :param crypto_key: A ``cryptography`` X.509 certificate.
1118        :type crypto_key: ``cryptography.x509.Certificate``
1119
1120        :rtype: X509
1121
1122        .. versionadded:: 17.1.0
1123        """
1124        if not isinstance(crypto_cert, x509.Certificate):
1125            raise TypeError("Must be a certificate")
1126
1127        cert = cls()
1128        cert._x509 = crypto_cert._x509
1129        return cert
1130
1131    def set_version(self, version):
1132        """
1133        Set the version number of the certificate. Note that the
1134        version value is zero-based, eg. a value of 0 is V1.
1135
1136        :param version: The version number of the certificate.
1137        :type version: :py:class:`int`
1138
1139        :return: ``None``
1140        """
1141        if not isinstance(version, int):
1142            raise TypeError("version must be an integer")
1143
1144        _lib.X509_set_version(self._x509, version)
1145
1146    def get_version(self):
1147        """
1148        Return the version number of the certificate.
1149
1150        :return: The version number of the certificate.
1151        :rtype: :py:class:`int`
1152        """
1153        return _lib.X509_get_version(self._x509)
1154
1155    def get_pubkey(self):
1156        """
1157        Get the public key of the certificate.
1158
1159        :return: The public key.
1160        :rtype: :py:class:`PKey`
1161        """
1162        pkey = PKey.__new__(PKey)
1163        pkey._pkey = _lib.X509_get_pubkey(self._x509)
1164        if pkey._pkey == _ffi.NULL:
1165            _raise_current_error()
1166        pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free)
1167        pkey._only_public = True
1168        return pkey
1169
1170    def set_pubkey(self, pkey):
1171        """
1172        Set the public key of the certificate.
1173
1174        :param pkey: The public key.
1175        :type pkey: :py:class:`PKey`
1176
1177        :return: :py:data:`None`
1178        """
1179        if not isinstance(pkey, PKey):
1180            raise TypeError("pkey must be a PKey instance")
1181
1182        set_result = _lib.X509_set_pubkey(self._x509, pkey._pkey)
1183        _openssl_assert(set_result == 1)
1184
1185    def sign(self, pkey, digest):
1186        """
1187        Sign the certificate with this key and digest type.
1188
1189        :param pkey: The key to sign with.
1190        :type pkey: :py:class:`PKey`
1191
1192        :param digest: The name of the message digest to use.
1193        :type digest: :py:class:`bytes`
1194
1195        :return: :py:data:`None`
1196        """
1197        if not isinstance(pkey, PKey):
1198            raise TypeError("pkey must be a PKey instance")
1199
1200        if pkey._only_public:
1201            raise ValueError("Key only has public part")
1202
1203        if not pkey._initialized:
1204            raise ValueError("Key is uninitialized")
1205
1206        evp_md = _lib.EVP_get_digestbyname(_byte_string(digest))
1207        if evp_md == _ffi.NULL:
1208            raise ValueError("No such digest method")
1209
1210        sign_result = _lib.X509_sign(self._x509, pkey._pkey, evp_md)
1211        _openssl_assert(sign_result > 0)
1212
1213    def get_signature_algorithm(self):
1214        """
1215        Return the signature algorithm used in the certificate.
1216
1217        :return: The name of the algorithm.
1218        :rtype: :py:class:`bytes`
1219
1220        :raises ValueError: If the signature algorithm is undefined.
1221
1222        .. versionadded:: 0.13
1223        """
1224        algor = _lib.X509_get0_tbs_sigalg(self._x509)
1225        nid = _lib.OBJ_obj2nid(algor.algorithm)
1226        if nid == _lib.NID_undef:
1227            raise ValueError("Undefined signature algorithm")
1228        return _ffi.string(_lib.OBJ_nid2ln(nid))
1229
1230    def digest(self, digest_name):
1231        """
1232        Return the digest of the X509 object.
1233
1234        :param digest_name: The name of the digest algorithm to use.
1235        :type digest_name: :py:class:`bytes`
1236
1237        :return: The digest of the object, formatted as
1238            :py:const:`b":"`-delimited hex pairs.
1239        :rtype: :py:class:`bytes`
1240        """
1241        digest = _lib.EVP_get_digestbyname(_byte_string(digest_name))
1242        if digest == _ffi.NULL:
1243            raise ValueError("No such digest method")
1244
1245        result_buffer = _ffi.new("unsigned char[]", _lib.EVP_MAX_MD_SIZE)
1246        result_length = _ffi.new("unsigned int[]", 1)
1247        result_length[0] = len(result_buffer)
1248
1249        digest_result = _lib.X509_digest(
1250            self._x509, digest, result_buffer, result_length)
1251        _openssl_assert(digest_result == 1)
1252
1253        return b":".join([
1254            b16encode(ch).upper() for ch
1255            in _ffi.buffer(result_buffer, result_length[0])])
1256
1257    def subject_name_hash(self):
1258        """
1259        Return the hash of the X509 subject.
1260
1261        :return: The hash of the subject.
1262        :rtype: :py:class:`bytes`
1263        """
1264        return _lib.X509_subject_name_hash(self._x509)
1265
1266    def set_serial_number(self, serial):
1267        """
1268        Set the serial number of the certificate.
1269
1270        :param serial: The new serial number.
1271        :type serial: :py:class:`int`
1272
1273        :return: :py:data`None`
1274        """
1275        if not isinstance(serial, _integer_types):
1276            raise TypeError("serial must be an integer")
1277
1278        hex_serial = hex(serial)[2:]
1279        if not isinstance(hex_serial, bytes):
1280            hex_serial = hex_serial.encode('ascii')
1281
1282        bignum_serial = _ffi.new("BIGNUM**")
1283
1284        # BN_hex2bn stores the result in &bignum.  Unless it doesn't feel like
1285        # it.  If bignum is still NULL after this call, then the return value
1286        # is actually the result.  I hope.  -exarkun
1287        small_serial = _lib.BN_hex2bn(bignum_serial, hex_serial)
1288
1289        if bignum_serial[0] == _ffi.NULL:
1290            set_result = _lib.ASN1_INTEGER_set(
1291                _lib.X509_get_serialNumber(self._x509), small_serial)
1292            if set_result:
1293                # TODO Not tested
1294                _raise_current_error()
1295        else:
1296            asn1_serial = _lib.BN_to_ASN1_INTEGER(bignum_serial[0], _ffi.NULL)
1297            _lib.BN_free(bignum_serial[0])
1298            if asn1_serial == _ffi.NULL:
1299                # TODO Not tested
1300                _raise_current_error()
1301            asn1_serial = _ffi.gc(asn1_serial, _lib.ASN1_INTEGER_free)
1302            set_result = _lib.X509_set_serialNumber(self._x509, asn1_serial)
1303            _openssl_assert(set_result == 1)
1304
1305    def get_serial_number(self):
1306        """
1307        Return the serial number of this certificate.
1308
1309        :return: The serial number.
1310        :rtype: int
1311        """
1312        asn1_serial = _lib.X509_get_serialNumber(self._x509)
1313        bignum_serial = _lib.ASN1_INTEGER_to_BN(asn1_serial, _ffi.NULL)
1314        try:
1315            hex_serial = _lib.BN_bn2hex(bignum_serial)
1316            try:
1317                hexstring_serial = _ffi.string(hex_serial)
1318                serial = int(hexstring_serial, 16)
1319                return serial
1320            finally:
1321                _lib.OPENSSL_free(hex_serial)
1322        finally:
1323            _lib.BN_free(bignum_serial)
1324
1325    def gmtime_adj_notAfter(self, amount):
1326        """
1327        Adjust the time stamp on which the certificate stops being valid.
1328
1329        :param int amount: The number of seconds by which to adjust the
1330            timestamp.
1331        :return: ``None``
1332        """
1333        if not isinstance(amount, int):
1334            raise TypeError("amount must be an integer")
1335
1336        notAfter = _lib.X509_get_notAfter(self._x509)
1337        _lib.X509_gmtime_adj(notAfter, amount)
1338
1339    def gmtime_adj_notBefore(self, amount):
1340        """
1341        Adjust the timestamp on which the certificate starts being valid.
1342
1343        :param amount: The number of seconds by which to adjust the timestamp.
1344        :return: ``None``
1345        """
1346        if not isinstance(amount, int):
1347            raise TypeError("amount must be an integer")
1348
1349        notBefore = _lib.X509_get_notBefore(self._x509)
1350        _lib.X509_gmtime_adj(notBefore, amount)
1351
1352    def has_expired(self):
1353        """
1354        Check whether the certificate has expired.
1355
1356        :return: ``True`` if the certificate has expired, ``False`` otherwise.
1357        :rtype: bool
1358        """
1359        time_string = _native(self.get_notAfter())
1360        not_after = datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ")
1361
1362        return not_after < datetime.datetime.utcnow()
1363
1364    def _get_boundary_time(self, which):
1365        return _get_asn1_time(which(self._x509))
1366
1367    def get_notBefore(self):
1368        """
1369        Get the timestamp at which the certificate starts being valid.
1370
1371        The timestamp is formatted as an ASN.1 TIME::
1372
1373            YYYYMMDDhhmmssZ
1374
1375        :return: A timestamp string, or ``None`` if there is none.
1376        :rtype: bytes or NoneType
1377        """
1378        return self._get_boundary_time(_lib.X509_get_notBefore)
1379
1380    def _set_boundary_time(self, which, when):
1381        return _set_asn1_time(which(self._x509), when)
1382
1383    def set_notBefore(self, when):
1384        """
1385        Set the timestamp at which the certificate starts being valid.
1386
1387        The timestamp is formatted as an ASN.1 TIME::
1388
1389            YYYYMMDDhhmmssZ
1390
1391        :param bytes when: A timestamp string.
1392        :return: ``None``
1393        """
1394        return self._set_boundary_time(_lib.X509_get_notBefore, when)
1395
1396    def get_notAfter(self):
1397        """
1398        Get the timestamp at which the certificate stops being valid.
1399
1400        The timestamp is formatted as an ASN.1 TIME::
1401
1402            YYYYMMDDhhmmssZ
1403
1404        :return: A timestamp string, or ``None`` if there is none.
1405        :rtype: bytes or NoneType
1406        """
1407        return self._get_boundary_time(_lib.X509_get_notAfter)
1408
1409    def set_notAfter(self, when):
1410        """
1411        Set the timestamp at which the certificate stops being valid.
1412
1413        The timestamp is formatted as an ASN.1 TIME::
1414
1415            YYYYMMDDhhmmssZ
1416
1417        :param bytes when: A timestamp string.
1418        :return: ``None``
1419        """
1420        return self._set_boundary_time(_lib.X509_get_notAfter, when)
1421
1422    def _get_name(self, which):
1423        name = X509Name.__new__(X509Name)
1424        name._name = which(self._x509)
1425        _openssl_assert(name._name != _ffi.NULL)
1426
1427        # The name is owned by the X509 structure.  As long as the X509Name
1428        # Python object is alive, keep the X509 Python object alive.
1429        name._owner = self
1430
1431        return name
1432
1433    def _set_name(self, which, name):
1434        if not isinstance(name, X509Name):
1435            raise TypeError("name must be an X509Name")
1436        set_result = which(self._x509, name._name)
1437        _openssl_assert(set_result == 1)
1438
1439    def get_issuer(self):
1440        """
1441        Return the issuer of this certificate.
1442
1443        This creates a new :class:`X509Name` that wraps the underlying issuer
1444        name field on the certificate. Modifying it will modify the underlying
1445        certificate, and will have the effect of modifying any other
1446        :class:`X509Name` that refers to this issuer.
1447
1448        :return: The issuer of this certificate.
1449        :rtype: :class:`X509Name`
1450        """
1451        name = self._get_name(_lib.X509_get_issuer_name)
1452        self._issuer_invalidator.add(name)
1453        return name
1454
1455    def set_issuer(self, issuer):
1456        """
1457        Set the issuer of this certificate.
1458
1459        :param issuer: The issuer.
1460        :type issuer: :py:class:`X509Name`
1461
1462        :return: ``None``
1463        """
1464        self._set_name(_lib.X509_set_issuer_name, issuer)
1465        self._issuer_invalidator.clear()
1466
1467    def get_subject(self):
1468        """
1469        Return the subject of this certificate.
1470
1471        This creates a new :class:`X509Name` that wraps the underlying subject
1472        name field on the certificate. Modifying it will modify the underlying
1473        certificate, and will have the effect of modifying any other
1474        :class:`X509Name` that refers to this subject.
1475
1476        :return: The subject of this certificate.
1477        :rtype: :class:`X509Name`
1478        """
1479        name = self._get_name(_lib.X509_get_subject_name)
1480        self._subject_invalidator.add(name)
1481        return name
1482
1483    def set_subject(self, subject):
1484        """
1485        Set the subject of this certificate.
1486
1487        :param subject: The subject.
1488        :type subject: :py:class:`X509Name`
1489
1490        :return: ``None``
1491        """
1492        self._set_name(_lib.X509_set_subject_name, subject)
1493        self._subject_invalidator.clear()
1494
1495    def get_extension_count(self):
1496        """
1497        Get the number of extensions on this certificate.
1498
1499        :return: The number of extensions.
1500        :rtype: :py:class:`int`
1501
1502        .. versionadded:: 0.12
1503        """
1504        return _lib.X509_get_ext_count(self._x509)
1505
1506    def add_extensions(self, extensions):
1507        """
1508        Add extensions to the certificate.
1509
1510        :param extensions: The extensions to add.
1511        :type extensions: An iterable of :py:class:`X509Extension` objects.
1512        :return: ``None``
1513        """
1514        for ext in extensions:
1515            if not isinstance(ext, X509Extension):
1516                raise ValueError("One of the elements is not an X509Extension")
1517
1518            add_result = _lib.X509_add_ext(self._x509, ext._extension, -1)
1519            if not add_result:
1520                _raise_current_error()
1521
1522    def get_extension(self, index):
1523        """
1524        Get a specific extension of the certificate by index.
1525
1526        Extensions on a certificate are kept in order. The index
1527        parameter selects which extension will be returned.
1528
1529        :param int index: The index of the extension to retrieve.
1530        :return: The extension at the specified index.
1531        :rtype: :py:class:`X509Extension`
1532        :raises IndexError: If the extension index was out of bounds.
1533
1534        .. versionadded:: 0.12
1535        """
1536        ext = X509Extension.__new__(X509Extension)
1537        ext._extension = _lib.X509_get_ext(self._x509, index)
1538        if ext._extension == _ffi.NULL:
1539            raise IndexError("extension index out of bounds")
1540
1541        extension = _lib.X509_EXTENSION_dup(ext._extension)
1542        ext._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free)
1543        return ext
1544
1545
1546X509Type = deprecated(
1547    X509, __name__,
1548    "X509Type has been deprecated, use X509 instead",
1549    DeprecationWarning
1550)
1551
1552
1553class X509StoreFlags(object):
1554    """
1555    Flags for X509 verification, used to change the behavior of
1556    :class:`X509Store`.
1557
1558    See `OpenSSL Verification Flags`_ for details.
1559
1560    .. _OpenSSL Verification Flags:
1561        https://www.openssl.org/docs/manmaster/man3/X509_VERIFY_PARAM_set_flags.html
1562    """
1563    CRL_CHECK = _lib.X509_V_FLAG_CRL_CHECK
1564    CRL_CHECK_ALL = _lib.X509_V_FLAG_CRL_CHECK_ALL
1565    IGNORE_CRITICAL = _lib.X509_V_FLAG_IGNORE_CRITICAL
1566    X509_STRICT = _lib.X509_V_FLAG_X509_STRICT
1567    ALLOW_PROXY_CERTS = _lib.X509_V_FLAG_ALLOW_PROXY_CERTS
1568    POLICY_CHECK = _lib.X509_V_FLAG_POLICY_CHECK
1569    EXPLICIT_POLICY = _lib.X509_V_FLAG_EXPLICIT_POLICY
1570    INHIBIT_MAP = _lib.X509_V_FLAG_INHIBIT_MAP
1571    NOTIFY_POLICY = _lib.X509_V_FLAG_NOTIFY_POLICY
1572    CHECK_SS_SIGNATURE = _lib.X509_V_FLAG_CHECK_SS_SIGNATURE
1573    CB_ISSUER_CHECK = _lib.X509_V_FLAG_CB_ISSUER_CHECK
1574
1575
1576class X509Store(object):
1577    """
1578    An X.509 store.
1579
1580    An X.509 store is used to describe a context in which to verify a
1581    certificate. A description of a context may include a set of certificates
1582    to trust, a set of certificate revocation lists, verification flags and
1583    more.
1584
1585    An X.509 store, being only a description, cannot be used by itself to
1586    verify a certificate. To carry out the actual verification process, see
1587    :class:`X509StoreContext`.
1588    """
1589
1590    def __init__(self):
1591        store = _lib.X509_STORE_new()
1592        self._store = _ffi.gc(store, _lib.X509_STORE_free)
1593
1594    def add_cert(self, cert):
1595        """
1596        Adds a trusted certificate to this store.
1597
1598        Adding a certificate with this method adds this certificate as a
1599        *trusted* certificate.
1600
1601        :param X509 cert: The certificate to add to this store.
1602
1603        :raises TypeError: If the certificate is not an :class:`X509`.
1604
1605        :raises OpenSSL.crypto.Error: If OpenSSL was unhappy with your
1606            certificate.
1607
1608        :return: ``None`` if the certificate was added successfully.
1609        """
1610        if not isinstance(cert, X509):
1611            raise TypeError()
1612
1613        # As of OpenSSL 1.1.0i adding the same cert to the store more than
1614        # once doesn't cause an error. Accordingly, this code now silences
1615        # the error for OpenSSL < 1.1.0i as well.
1616        if _lib.X509_STORE_add_cert(self._store, cert._x509) == 0:
1617            code = _lib.ERR_peek_error()
1618            err_reason = _lib.ERR_GET_REASON(code)
1619            _openssl_assert(
1620                err_reason == _lib.X509_R_CERT_ALREADY_IN_HASH_TABLE
1621            )
1622            _lib.ERR_clear_error()
1623
1624    def add_crl(self, crl):
1625        """
1626        Add a certificate revocation list to this store.
1627
1628        The certificate revocation lists added to a store will only be used if
1629        the associated flags are configured to check certificate revocation
1630        lists.
1631
1632        .. versionadded:: 16.1.0
1633
1634        :param CRL crl: The certificate revocation list to add to this store.
1635        :return: ``None`` if the certificate revocation list was added
1636            successfully.
1637        """
1638        _openssl_assert(_lib.X509_STORE_add_crl(self._store, crl._crl) != 0)
1639
1640    def set_flags(self, flags):
1641        """
1642        Set verification flags to this store.
1643
1644        Verification flags can be combined by oring them together.
1645
1646        .. note::
1647
1648          Setting a verification flag sometimes requires clients to add
1649          additional information to the store, otherwise a suitable error will
1650          be raised.
1651
1652          For example, in setting flags to enable CRL checking a
1653          suitable CRL must be added to the store otherwise an error will be
1654          raised.
1655
1656        .. versionadded:: 16.1.0
1657
1658        :param int flags: The verification flags to set on this store.
1659            See :class:`X509StoreFlags` for available constants.
1660        :return: ``None`` if the verification flags were successfully set.
1661        """
1662        _openssl_assert(_lib.X509_STORE_set_flags(self._store, flags) != 0)
1663
1664    def set_time(self, vfy_time):
1665        """
1666        Set the time against which the certificates are verified.
1667
1668        Normally the current time is used.
1669
1670        .. note::
1671
1672          For example, you can determine if a certificate was valid at a given
1673          time.
1674
1675        .. versionadded:: 17.0.0
1676
1677        :param datetime vfy_time: The verification time to set on this store.
1678        :return: ``None`` if the verification time was successfully set.
1679        """
1680        param = _lib.X509_VERIFY_PARAM_new()
1681        param = _ffi.gc(param, _lib.X509_VERIFY_PARAM_free)
1682
1683        _lib.X509_VERIFY_PARAM_set_time(param, int(vfy_time.strftime('%s')))
1684        _openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0)
1685
1686
1687X509StoreType = deprecated(
1688    X509Store, __name__,
1689    "X509StoreType has been deprecated, use X509Store instead",
1690    DeprecationWarning
1691)
1692
1693
1694class X509StoreContextError(Exception):
1695    """
1696    An exception raised when an error occurred while verifying a certificate
1697    using `OpenSSL.X509StoreContext.verify_certificate`.
1698
1699    :ivar certificate: The certificate which caused verificate failure.
1700    :type certificate: :class:`X509`
1701    """
1702
1703    def __init__(self, message, certificate):
1704        super(X509StoreContextError, self).__init__(message)
1705        self.certificate = certificate
1706
1707
1708class X509StoreContext(object):
1709    """
1710    An X.509 store context.
1711
1712    An X.509 store context is used to carry out the actual verification process
1713    of a certificate in a described context. For describing such a context, see
1714    :class:`X509Store`.
1715
1716    :ivar _store_ctx: The underlying X509_STORE_CTX structure used by this
1717        instance.  It is dynamically allocated and automatically garbage
1718        collected.
1719    :ivar _store: See the ``store`` ``__init__`` parameter.
1720    :ivar _cert: See the ``certificate`` ``__init__`` parameter.
1721    :param X509Store store: The certificates which will be trusted for the
1722        purposes of any verifications.
1723    :param X509 certificate: The certificate to be verified.
1724    """
1725
1726    def __init__(self, store, certificate):
1727        store_ctx = _lib.X509_STORE_CTX_new()
1728        self._store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free)
1729        self._store = store
1730        self._cert = certificate
1731        # Make the store context available for use after instantiating this
1732        # class by initializing it now. Per testing, subsequent calls to
1733        # :meth:`_init` have no adverse affect.
1734        self._init()
1735
1736    def _init(self):
1737        """
1738        Set up the store context for a subsequent verification operation.
1739
1740        Calling this method more than once without first calling
1741        :meth:`_cleanup` will leak memory.
1742        """
1743        ret = _lib.X509_STORE_CTX_init(
1744            self._store_ctx, self._store._store, self._cert._x509, _ffi.NULL
1745        )
1746        if ret <= 0:
1747            _raise_current_error()
1748
1749    def _cleanup(self):
1750        """
1751        Internally cleans up the store context.
1752
1753        The store context can then be reused with a new call to :meth:`_init`.
1754        """
1755        _lib.X509_STORE_CTX_cleanup(self._store_ctx)
1756
1757    def _exception_from_context(self):
1758        """
1759        Convert an OpenSSL native context error failure into a Python
1760        exception.
1761
1762        When a call to native OpenSSL X509_verify_cert fails, additional
1763        information about the failure can be obtained from the store context.
1764        """
1765        errors = [
1766            _lib.X509_STORE_CTX_get_error(self._store_ctx),
1767            _lib.X509_STORE_CTX_get_error_depth(self._store_ctx),
1768            _native(_ffi.string(_lib.X509_verify_cert_error_string(
1769                _lib.X509_STORE_CTX_get_error(self._store_ctx)))),
1770        ]
1771        # A context error should always be associated with a certificate, so we
1772        # expect this call to never return :class:`None`.
1773        _x509 = _lib.X509_STORE_CTX_get_current_cert(self._store_ctx)
1774        _cert = _lib.X509_dup(_x509)
1775        pycert = X509._from_raw_x509_ptr(_cert)
1776        return X509StoreContextError(errors, pycert)
1777
1778    def set_store(self, store):
1779        """
1780        Set the context's X.509 store.
1781
1782        .. versionadded:: 0.15
1783
1784        :param X509Store store: The store description which will be used for
1785            the purposes of any *future* verifications.
1786        """
1787        self._store = store
1788
1789    def verify_certificate(self):
1790        """
1791        Verify a certificate in a context.
1792
1793        .. versionadded:: 0.15
1794
1795        :raises X509StoreContextError: If an error occurred when validating a
1796          certificate in the context. Sets ``certificate`` attribute to
1797          indicate which certificate caused the error.
1798        """
1799        # Always re-initialize the store context in case
1800        # :meth:`verify_certificate` is called multiple times.
1801        #
1802        # :meth:`_init` is called in :meth:`__init__` so _cleanup is called
1803        # before _init to ensure memory is not leaked.
1804        self._cleanup()
1805        self._init()
1806        ret = _lib.X509_verify_cert(self._store_ctx)
1807        self._cleanup()
1808        if ret <= 0:
1809            raise self._exception_from_context()
1810
1811
1812def load_certificate(type, buffer):
1813    """
1814    Load a certificate (X509) from the string *buffer* encoded with the
1815    type *type*.
1816
1817    :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
1818
1819    :param bytes buffer: The buffer the certificate is stored in
1820
1821    :return: The X509 object
1822    """
1823    if isinstance(buffer, _text_type):
1824        buffer = buffer.encode("ascii")
1825
1826    bio = _new_mem_buf(buffer)
1827
1828    if type == FILETYPE_PEM:
1829        x509 = _lib.PEM_read_bio_X509(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
1830    elif type == FILETYPE_ASN1:
1831        x509 = _lib.d2i_X509_bio(bio, _ffi.NULL)
1832    else:
1833        raise ValueError(
1834            "type argument must be FILETYPE_PEM or FILETYPE_ASN1")
1835
1836    if x509 == _ffi.NULL:
1837        _raise_current_error()
1838
1839    return X509._from_raw_x509_ptr(x509)
1840
1841
1842def dump_certificate(type, cert):
1843    """
1844    Dump the certificate *cert* into a buffer string encoded with the type
1845    *type*.
1846
1847    :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1, or
1848        FILETYPE_TEXT)
1849    :param cert: The certificate to dump
1850    :return: The buffer with the dumped certificate in
1851    """
1852    bio = _new_mem_buf()
1853
1854    if type == FILETYPE_PEM:
1855        result_code = _lib.PEM_write_bio_X509(bio, cert._x509)
1856    elif type == FILETYPE_ASN1:
1857        result_code = _lib.i2d_X509_bio(bio, cert._x509)
1858    elif type == FILETYPE_TEXT:
1859        result_code = _lib.X509_print_ex(bio, cert._x509, 0, 0)
1860    else:
1861        raise ValueError(
1862            "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
1863            "FILETYPE_TEXT")
1864
1865    assert result_code == 1
1866    return _bio_to_string(bio)
1867
1868
1869def dump_publickey(type, pkey):
1870    """
1871    Dump a public key to a buffer.
1872
1873    :param type: The file type (one of :data:`FILETYPE_PEM` or
1874        :data:`FILETYPE_ASN1`).
1875    :param PKey pkey: The public key to dump
1876    :return: The buffer with the dumped key in it.
1877    :rtype: bytes
1878    """
1879    bio = _new_mem_buf()
1880    if type == FILETYPE_PEM:
1881        write_bio = _lib.PEM_write_bio_PUBKEY
1882    elif type == FILETYPE_ASN1:
1883        write_bio = _lib.i2d_PUBKEY_bio
1884    else:
1885        raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
1886
1887    result_code = write_bio(bio, pkey._pkey)
1888    if result_code != 1:  # pragma: no cover
1889        _raise_current_error()
1890
1891    return _bio_to_string(bio)
1892
1893
1894def dump_privatekey(type, pkey, cipher=None, passphrase=None):
1895    """
1896    Dump the private key *pkey* into a buffer string encoded with the type
1897    *type*.  Optionally (if *type* is :const:`FILETYPE_PEM`) encrypting it
1898    using *cipher* and *passphrase*.
1899
1900    :param type: The file type (one of :const:`FILETYPE_PEM`,
1901        :const:`FILETYPE_ASN1`, or :const:`FILETYPE_TEXT`)
1902    :param PKey pkey: The PKey to dump
1903    :param cipher: (optional) if encrypted PEM format, the cipher to use
1904    :param passphrase: (optional) if encrypted PEM format, this can be either
1905        the passphrase to use, or a callback for providing the passphrase.
1906
1907    :return: The buffer with the dumped key in
1908    :rtype: bytes
1909    """
1910    bio = _new_mem_buf()
1911
1912    if not isinstance(pkey, PKey):
1913        raise TypeError("pkey must be a PKey")
1914
1915    if cipher is not None:
1916        if passphrase is None:
1917            raise TypeError(
1918                "if a value is given for cipher "
1919                "one must also be given for passphrase")
1920        cipher_obj = _lib.EVP_get_cipherbyname(_byte_string(cipher))
1921        if cipher_obj == _ffi.NULL:
1922            raise ValueError("Invalid cipher name")
1923    else:
1924        cipher_obj = _ffi.NULL
1925
1926    helper = _PassphraseHelper(type, passphrase)
1927    if type == FILETYPE_PEM:
1928        result_code = _lib.PEM_write_bio_PrivateKey(
1929            bio, pkey._pkey, cipher_obj, _ffi.NULL, 0,
1930            helper.callback, helper.callback_args)
1931        helper.raise_if_problem()
1932    elif type == FILETYPE_ASN1:
1933        result_code = _lib.i2d_PrivateKey_bio(bio, pkey._pkey)
1934    elif type == FILETYPE_TEXT:
1935        if _lib.EVP_PKEY_id(pkey._pkey) != _lib.EVP_PKEY_RSA:
1936            raise TypeError("Only RSA keys are supported for FILETYPE_TEXT")
1937
1938        rsa = _ffi.gc(
1939            _lib.EVP_PKEY_get1_RSA(pkey._pkey),
1940            _lib.RSA_free
1941        )
1942        result_code = _lib.RSA_print(bio, rsa, 0)
1943    else:
1944        raise ValueError(
1945            "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
1946            "FILETYPE_TEXT")
1947
1948    _openssl_assert(result_code != 0)
1949
1950    return _bio_to_string(bio)
1951
1952
1953class Revoked(object):
1954    """
1955    A certificate revocation.
1956    """
1957    # https://www.openssl.org/docs/manmaster/man5/x509v3_config.html#CRL-distribution-points
1958    # which differs from crl_reasons of crypto/x509v3/v3_enum.c that matches
1959    # OCSP_crl_reason_str.  We use the latter, just like the command line
1960    # program.
1961    _crl_reasons = [
1962        b"unspecified",
1963        b"keyCompromise",
1964        b"CACompromise",
1965        b"affiliationChanged",
1966        b"superseded",
1967        b"cessationOfOperation",
1968        b"certificateHold",
1969        # b"removeFromCRL",
1970    ]
1971
1972    def __init__(self):
1973        revoked = _lib.X509_REVOKED_new()
1974        self._revoked = _ffi.gc(revoked, _lib.X509_REVOKED_free)
1975
1976    def set_serial(self, hex_str):
1977        """
1978        Set the serial number.
1979
1980        The serial number is formatted as a hexadecimal number encoded in
1981        ASCII.
1982
1983        :param bytes hex_str: The new serial number.
1984
1985        :return: ``None``
1986        """
1987        bignum_serial = _ffi.gc(_lib.BN_new(), _lib.BN_free)
1988        bignum_ptr = _ffi.new("BIGNUM**")
1989        bignum_ptr[0] = bignum_serial
1990        bn_result = _lib.BN_hex2bn(bignum_ptr, hex_str)
1991        if not bn_result:
1992            raise ValueError("bad hex string")
1993
1994        asn1_serial = _ffi.gc(
1995            _lib.BN_to_ASN1_INTEGER(bignum_serial, _ffi.NULL),
1996            _lib.ASN1_INTEGER_free)
1997        _lib.X509_REVOKED_set_serialNumber(self._revoked, asn1_serial)
1998
1999    def get_serial(self):
2000        """
2001        Get the serial number.
2002
2003        The serial number is formatted as a hexadecimal number encoded in
2004        ASCII.
2005
2006        :return: The serial number.
2007        :rtype: bytes
2008        """
2009        bio = _new_mem_buf()
2010
2011        asn1_int = _lib.X509_REVOKED_get0_serialNumber(self._revoked)
2012        _openssl_assert(asn1_int != _ffi.NULL)
2013        result = _lib.i2a_ASN1_INTEGER(bio, asn1_int)
2014        _openssl_assert(result >= 0)
2015        return _bio_to_string(bio)
2016
2017    def _delete_reason(self):
2018        for i in range(_lib.X509_REVOKED_get_ext_count(self._revoked)):
2019            ext = _lib.X509_REVOKED_get_ext(self._revoked, i)
2020            obj = _lib.X509_EXTENSION_get_object(ext)
2021            if _lib.OBJ_obj2nid(obj) == _lib.NID_crl_reason:
2022                _lib.X509_EXTENSION_free(ext)
2023                _lib.X509_REVOKED_delete_ext(self._revoked, i)
2024                break
2025
2026    def set_reason(self, reason):
2027        """
2028        Set the reason of this revocation.
2029
2030        If :data:`reason` is ``None``, delete the reason instead.
2031
2032        :param reason: The reason string.
2033        :type reason: :class:`bytes` or :class:`NoneType`
2034
2035        :return: ``None``
2036
2037        .. seealso::
2038
2039            :meth:`all_reasons`, which gives you a list of all supported
2040            reasons which you might pass to this method.
2041        """
2042        if reason is None:
2043            self._delete_reason()
2044        elif not isinstance(reason, bytes):
2045            raise TypeError("reason must be None or a byte string")
2046        else:
2047            reason = reason.lower().replace(b' ', b'')
2048            reason_code = [r.lower() for r in self._crl_reasons].index(reason)
2049
2050            new_reason_ext = _lib.ASN1_ENUMERATED_new()
2051            _openssl_assert(new_reason_ext != _ffi.NULL)
2052            new_reason_ext = _ffi.gc(new_reason_ext, _lib.ASN1_ENUMERATED_free)
2053
2054            set_result = _lib.ASN1_ENUMERATED_set(new_reason_ext, reason_code)
2055            _openssl_assert(set_result != _ffi.NULL)
2056
2057            self._delete_reason()
2058            add_result = _lib.X509_REVOKED_add1_ext_i2d(
2059                self._revoked, _lib.NID_crl_reason, new_reason_ext, 0, 0)
2060            _openssl_assert(add_result == 1)
2061
2062    def get_reason(self):
2063        """
2064        Get the reason of this revocation.
2065
2066        :return: The reason, or ``None`` if there is none.
2067        :rtype: bytes or NoneType
2068
2069        .. seealso::
2070
2071            :meth:`all_reasons`, which gives you a list of all supported
2072            reasons this method might return.
2073        """
2074        for i in range(_lib.X509_REVOKED_get_ext_count(self._revoked)):
2075            ext = _lib.X509_REVOKED_get_ext(self._revoked, i)
2076            obj = _lib.X509_EXTENSION_get_object(ext)
2077            if _lib.OBJ_obj2nid(obj) == _lib.NID_crl_reason:
2078                bio = _new_mem_buf()
2079
2080                print_result = _lib.X509V3_EXT_print(bio, ext, 0, 0)
2081                if not print_result:
2082                    print_result = _lib.M_ASN1_OCTET_STRING_print(
2083                        bio, _lib.X509_EXTENSION_get_data(ext)
2084                    )
2085                    _openssl_assert(print_result != 0)
2086
2087                return _bio_to_string(bio)
2088
2089    def all_reasons(self):
2090        """
2091        Return a list of all the supported reason strings.
2092
2093        This list is a copy; modifying it does not change the supported reason
2094        strings.
2095
2096        :return: A list of reason strings.
2097        :rtype: :class:`list` of :class:`bytes`
2098        """
2099        return self._crl_reasons[:]
2100
2101    def set_rev_date(self, when):
2102        """
2103        Set the revocation timestamp.
2104
2105        :param bytes when: The timestamp of the revocation,
2106            as ASN.1 TIME.
2107        :return: ``None``
2108        """
2109        dt = _lib.X509_REVOKED_get0_revocationDate(self._revoked)
2110        return _set_asn1_time(dt, when)
2111
2112    def get_rev_date(self):
2113        """
2114        Get the revocation timestamp.
2115
2116        :return: The timestamp of the revocation, as ASN.1 TIME.
2117        :rtype: bytes
2118        """
2119        dt = _lib.X509_REVOKED_get0_revocationDate(self._revoked)
2120        return _get_asn1_time(dt)
2121
2122
2123class CRL(object):
2124    """
2125    A certificate revocation list.
2126    """
2127
2128    def __init__(self):
2129        crl = _lib.X509_CRL_new()
2130        self._crl = _ffi.gc(crl, _lib.X509_CRL_free)
2131
2132    def to_cryptography(self):
2133        """
2134        Export as a ``cryptography`` CRL.
2135
2136        :rtype: ``cryptography.x509.CertificateRevocationList``
2137
2138        .. versionadded:: 17.1.0
2139        """
2140        from cryptography.hazmat.backends.openssl.x509 import (
2141            _CertificateRevocationList
2142        )
2143        backend = _get_backend()
2144        return _CertificateRevocationList(backend, self._crl)
2145
2146    @classmethod
2147    def from_cryptography(cls, crypto_crl):
2148        """
2149        Construct based on a ``cryptography`` *crypto_crl*.
2150
2151        :param crypto_crl: A ``cryptography`` certificate revocation list
2152        :type crypto_crl: ``cryptography.x509.CertificateRevocationList``
2153
2154        :rtype: CRL
2155
2156        .. versionadded:: 17.1.0
2157        """
2158        if not isinstance(crypto_crl, x509.CertificateRevocationList):
2159            raise TypeError("Must be a certificate revocation list")
2160
2161        crl = cls()
2162        crl._crl = crypto_crl._x509_crl
2163        return crl
2164
2165    def get_revoked(self):
2166        """
2167        Return the revocations in this certificate revocation list.
2168
2169        These revocations will be provided by value, not by reference.
2170        That means it's okay to mutate them: it won't affect this CRL.
2171
2172        :return: The revocations in this CRL.
2173        :rtype: :class:`tuple` of :class:`Revocation`
2174        """
2175        results = []
2176        revoked_stack = _lib.X509_CRL_get_REVOKED(self._crl)
2177        for i in range(_lib.sk_X509_REVOKED_num(revoked_stack)):
2178            revoked = _lib.sk_X509_REVOKED_value(revoked_stack, i)
2179            revoked_copy = _lib.Cryptography_X509_REVOKED_dup(revoked)
2180            pyrev = Revoked.__new__(Revoked)
2181            pyrev._revoked = _ffi.gc(revoked_copy, _lib.X509_REVOKED_free)
2182            results.append(pyrev)
2183        if results:
2184            return tuple(results)
2185
2186    def add_revoked(self, revoked):
2187        """
2188        Add a revoked (by value not reference) to the CRL structure
2189
2190        This revocation will be added by value, not by reference. That
2191        means it's okay to mutate it after adding: it won't affect
2192        this CRL.
2193
2194        :param Revoked revoked: The new revocation.
2195        :return: ``None``
2196        """
2197        copy = _lib.Cryptography_X509_REVOKED_dup(revoked._revoked)
2198        _openssl_assert(copy != _ffi.NULL)
2199
2200        add_result = _lib.X509_CRL_add0_revoked(self._crl, copy)
2201        _openssl_assert(add_result != 0)
2202
2203    def get_issuer(self):
2204        """
2205        Get the CRL's issuer.
2206
2207        .. versionadded:: 16.1.0
2208
2209        :rtype: X509Name
2210        """
2211        _issuer = _lib.X509_NAME_dup(_lib.X509_CRL_get_issuer(self._crl))
2212        _openssl_assert(_issuer != _ffi.NULL)
2213        _issuer = _ffi.gc(_issuer, _lib.X509_NAME_free)
2214        issuer = X509Name.__new__(X509Name)
2215        issuer._name = _issuer
2216        return issuer
2217
2218    def set_version(self, version):
2219        """
2220        Set the CRL version.
2221
2222        .. versionadded:: 16.1.0
2223
2224        :param int version: The version of the CRL.
2225        :return: ``None``
2226        """
2227        _openssl_assert(_lib.X509_CRL_set_version(self._crl, version) != 0)
2228
2229    def _set_boundary_time(self, which, when):
2230        return _set_asn1_time(which(self._crl), when)
2231
2232    def set_lastUpdate(self, when):
2233        """
2234        Set when the CRL was last updated.
2235
2236        The timestamp is formatted as an ASN.1 TIME::
2237
2238            YYYYMMDDhhmmssZ
2239
2240        .. versionadded:: 16.1.0
2241
2242        :param bytes when: A timestamp string.
2243        :return: ``None``
2244        """
2245        return self._set_boundary_time(_lib.X509_CRL_get_lastUpdate, when)
2246
2247    def set_nextUpdate(self, when):
2248        """
2249        Set when the CRL will next be udpated.
2250
2251        The timestamp is formatted as an ASN.1 TIME::
2252
2253            YYYYMMDDhhmmssZ
2254
2255        .. versionadded:: 16.1.0
2256
2257        :param bytes when: A timestamp string.
2258        :return: ``None``
2259        """
2260        return self._set_boundary_time(_lib.X509_CRL_get_nextUpdate, when)
2261
2262    def sign(self, issuer_cert, issuer_key, digest):
2263        """
2264        Sign the CRL.
2265
2266        Signing a CRL enables clients to associate the CRL itself with an
2267        issuer. Before a CRL is meaningful to other OpenSSL functions, it must
2268        be signed by an issuer.
2269
2270        This method implicitly sets the issuer's name based on the issuer
2271        certificate and private key used to sign the CRL.
2272
2273        .. versionadded:: 16.1.0
2274
2275        :param X509 issuer_cert: The issuer's certificate.
2276        :param PKey issuer_key: The issuer's private key.
2277        :param bytes digest: The digest method to sign the CRL with.
2278        """
2279        digest_obj = _lib.EVP_get_digestbyname(digest)
2280        _openssl_assert(digest_obj != _ffi.NULL)
2281        _lib.X509_CRL_set_issuer_name(
2282            self._crl, _lib.X509_get_subject_name(issuer_cert._x509))
2283        _lib.X509_CRL_sort(self._crl)
2284        result = _lib.X509_CRL_sign(self._crl, issuer_key._pkey, digest_obj)
2285        _openssl_assert(result != 0)
2286
2287    def export(self, cert, key, type=FILETYPE_PEM, days=100,
2288               digest=_UNSPECIFIED):
2289        """
2290        Export the CRL as a string.
2291
2292        :param X509 cert: The certificate used to sign the CRL.
2293        :param PKey key: The key used to sign the CRL.
2294        :param int type: The export format, either :data:`FILETYPE_PEM`,
2295            :data:`FILETYPE_ASN1`, or :data:`FILETYPE_TEXT`.
2296        :param int days: The number of days until the next update of this CRL.
2297        :param bytes digest: The name of the message digest to use (eg
2298            ``b"sha2566"``).
2299        :rtype: bytes
2300        """
2301
2302        if not isinstance(cert, X509):
2303            raise TypeError("cert must be an X509 instance")
2304        if not isinstance(key, PKey):
2305            raise TypeError("key must be a PKey instance")
2306        if not isinstance(type, int):
2307            raise TypeError("type must be an integer")
2308
2309        if digest is _UNSPECIFIED:
2310            raise TypeError("digest must be provided")
2311
2312        digest_obj = _lib.EVP_get_digestbyname(digest)
2313        if digest_obj == _ffi.NULL:
2314            raise ValueError("No such digest method")
2315
2316        bio = _lib.BIO_new(_lib.BIO_s_mem())
2317        _openssl_assert(bio != _ffi.NULL)
2318
2319        # A scratch time object to give different values to different CRL
2320        # fields
2321        sometime = _lib.ASN1_TIME_new()
2322        _openssl_assert(sometime != _ffi.NULL)
2323
2324        _lib.X509_gmtime_adj(sometime, 0)
2325        _lib.X509_CRL_set_lastUpdate(self._crl, sometime)
2326
2327        _lib.X509_gmtime_adj(sometime, days * 24 * 60 * 60)
2328        _lib.X509_CRL_set_nextUpdate(self._crl, sometime)
2329
2330        _lib.X509_CRL_set_issuer_name(
2331            self._crl, _lib.X509_get_subject_name(cert._x509)
2332        )
2333
2334        sign_result = _lib.X509_CRL_sign(self._crl, key._pkey, digest_obj)
2335        if not sign_result:
2336            _raise_current_error()
2337
2338        return dump_crl(type, self)
2339
2340
2341CRLType = deprecated(
2342    CRL, __name__,
2343    "CRLType has been deprecated, use CRL instead",
2344    DeprecationWarning
2345)
2346
2347
2348class PKCS7(object):
2349    def type_is_signed(self):
2350        """
2351        Check if this NID_pkcs7_signed object
2352
2353        :return: True if the PKCS7 is of type signed
2354        """
2355        return bool(_lib.PKCS7_type_is_signed(self._pkcs7))
2356
2357    def type_is_enveloped(self):
2358        """
2359        Check if this NID_pkcs7_enveloped object
2360
2361        :returns: True if the PKCS7 is of type enveloped
2362        """
2363        return bool(_lib.PKCS7_type_is_enveloped(self._pkcs7))
2364
2365    def type_is_signedAndEnveloped(self):
2366        """
2367        Check if this NID_pkcs7_signedAndEnveloped object
2368
2369        :returns: True if the PKCS7 is of type signedAndEnveloped
2370        """
2371        return bool(_lib.PKCS7_type_is_signedAndEnveloped(self._pkcs7))
2372
2373    def type_is_data(self):
2374        """
2375        Check if this NID_pkcs7_data object
2376
2377        :return: True if the PKCS7 is of type data
2378        """
2379        return bool(_lib.PKCS7_type_is_data(self._pkcs7))
2380
2381    def get_type_name(self):
2382        """
2383        Returns the type name of the PKCS7 structure
2384
2385        :return: A string with the typename
2386        """
2387        nid = _lib.OBJ_obj2nid(self._pkcs7.type)
2388        string_type = _lib.OBJ_nid2sn(nid)
2389        return _ffi.string(string_type)
2390
2391
2392PKCS7Type = deprecated(
2393    PKCS7, __name__,
2394    "PKCS7Type has been deprecated, use PKCS7 instead",
2395    DeprecationWarning
2396)
2397
2398
2399class PKCS12(object):
2400    """
2401    A PKCS #12 archive.
2402    """
2403
2404    def __init__(self):
2405        self._pkey = None
2406        self._cert = None
2407        self._cacerts = None
2408        self._friendlyname = None
2409
2410    def get_certificate(self):
2411        """
2412        Get the certificate in the PKCS #12 structure.
2413
2414        :return: The certificate, or :py:const:`None` if there is none.
2415        :rtype: :py:class:`X509` or :py:const:`None`
2416        """
2417        return self._cert
2418
2419    def set_certificate(self, cert):
2420        """
2421        Set the certificate in the PKCS #12 structure.
2422
2423        :param cert: The new certificate, or :py:const:`None` to unset it.
2424        :type cert: :py:class:`X509` or :py:const:`None`
2425
2426        :return: ``None``
2427        """
2428        if not isinstance(cert, X509):
2429            raise TypeError("cert must be an X509 instance")
2430        self._cert = cert
2431
2432    def get_privatekey(self):
2433        """
2434        Get the private key in the PKCS #12 structure.
2435
2436        :return: The private key, or :py:const:`None` if there is none.
2437        :rtype: :py:class:`PKey`
2438        """
2439        return self._pkey
2440
2441    def set_privatekey(self, pkey):
2442        """
2443        Set the certificate portion of the PKCS #12 structure.
2444
2445        :param pkey: The new private key, or :py:const:`None` to unset it.
2446        :type pkey: :py:class:`PKey` or :py:const:`None`
2447
2448        :return: ``None``
2449        """
2450        if not isinstance(pkey, PKey):
2451            raise TypeError("pkey must be a PKey instance")
2452        self._pkey = pkey
2453
2454    def get_ca_certificates(self):
2455        """
2456        Get the CA certificates in the PKCS #12 structure.
2457
2458        :return: A tuple with the CA certificates in the chain, or
2459            :py:const:`None` if there are none.
2460        :rtype: :py:class:`tuple` of :py:class:`X509` or :py:const:`None`
2461        """
2462        if self._cacerts is not None:
2463            return tuple(self._cacerts)
2464
2465    def set_ca_certificates(self, cacerts):
2466        """
2467        Replace or set the CA certificates within the PKCS12 object.
2468
2469        :param cacerts: The new CA certificates, or :py:const:`None` to unset
2470            them.
2471        :type cacerts: An iterable of :py:class:`X509` or :py:const:`None`
2472
2473        :return: ``None``
2474        """
2475        if cacerts is None:
2476            self._cacerts = None
2477        else:
2478            cacerts = list(cacerts)
2479            for cert in cacerts:
2480                if not isinstance(cert, X509):
2481                    raise TypeError(
2482                        "iterable must only contain X509 instances"
2483                    )
2484            self._cacerts = cacerts
2485
2486    def set_friendlyname(self, name):
2487        """
2488        Set the friendly name in the PKCS #12 structure.
2489
2490        :param name: The new friendly name, or :py:const:`None` to unset.
2491        :type name: :py:class:`bytes` or :py:const:`None`
2492
2493        :return: ``None``
2494        """
2495        if name is None:
2496            self._friendlyname = None
2497        elif not isinstance(name, bytes):
2498            raise TypeError(
2499                "name must be a byte string or None (not %r)" % (name,)
2500            )
2501        self._friendlyname = name
2502
2503    def get_friendlyname(self):
2504        """
2505        Get the friendly name in the PKCS# 12 structure.
2506
2507        :returns: The friendly name,  or :py:const:`None` if there is none.
2508        :rtype: :py:class:`bytes` or :py:const:`None`
2509        """
2510        return self._friendlyname
2511
2512    def export(self, passphrase=None, iter=2048, maciter=1):
2513        """
2514        Dump a PKCS12 object as a string.
2515
2516        For more information, see the :c:func:`PKCS12_create` man page.
2517
2518        :param passphrase: The passphrase used to encrypt the structure. Unlike
2519            some other passphrase arguments, this *must* be a string, not a
2520            callback.
2521        :type passphrase: :py:data:`bytes`
2522
2523        :param iter: Number of times to repeat the encryption step.
2524        :type iter: :py:data:`int`
2525
2526        :param maciter: Number of times to repeat the MAC step.
2527        :type maciter: :py:data:`int`
2528
2529        :return: The string representation of the PKCS #12 structure.
2530        :rtype:
2531        """
2532        passphrase = _text_to_bytes_and_warn("passphrase", passphrase)
2533
2534        if self._cacerts is None:
2535            cacerts = _ffi.NULL
2536        else:
2537            cacerts = _lib.sk_X509_new_null()
2538            cacerts = _ffi.gc(cacerts, _lib.sk_X509_free)
2539            for cert in self._cacerts:
2540                _lib.sk_X509_push(cacerts, cert._x509)
2541
2542        if passphrase is None:
2543            passphrase = _ffi.NULL
2544
2545        friendlyname = self._friendlyname
2546        if friendlyname is None:
2547            friendlyname = _ffi.NULL
2548
2549        if self._pkey is None:
2550            pkey = _ffi.NULL
2551        else:
2552            pkey = self._pkey._pkey
2553
2554        if self._cert is None:
2555            cert = _ffi.NULL
2556        else:
2557            cert = self._cert._x509
2558
2559        pkcs12 = _lib.PKCS12_create(
2560            passphrase, friendlyname, pkey, cert, cacerts,
2561            _lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC,
2562            _lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC,
2563            iter, maciter, 0)
2564        if pkcs12 == _ffi.NULL:
2565            _raise_current_error()
2566        pkcs12 = _ffi.gc(pkcs12, _lib.PKCS12_free)
2567
2568        bio = _new_mem_buf()
2569        _lib.i2d_PKCS12_bio(bio, pkcs12)
2570        return _bio_to_string(bio)
2571
2572
2573PKCS12Type = deprecated(
2574    PKCS12, __name__,
2575    "PKCS12Type has been deprecated, use PKCS12 instead",
2576    DeprecationWarning
2577)
2578
2579
2580class NetscapeSPKI(object):
2581    """
2582    A Netscape SPKI object.
2583    """
2584
2585    def __init__(self):
2586        spki = _lib.NETSCAPE_SPKI_new()
2587        self._spki = _ffi.gc(spki, _lib.NETSCAPE_SPKI_free)
2588
2589    def sign(self, pkey, digest):
2590        """
2591        Sign the certificate request with this key and digest type.
2592
2593        :param pkey: The private key to sign with.
2594        :type pkey: :py:class:`PKey`
2595
2596        :param digest: The message digest to use.
2597        :type digest: :py:class:`bytes`
2598
2599        :return: ``None``
2600        """
2601        if pkey._only_public:
2602            raise ValueError("Key has only public part")
2603
2604        if not pkey._initialized:
2605            raise ValueError("Key is uninitialized")
2606
2607        digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
2608        if digest_obj == _ffi.NULL:
2609            raise ValueError("No such digest method")
2610
2611        sign_result = _lib.NETSCAPE_SPKI_sign(
2612            self._spki, pkey._pkey, digest_obj
2613        )
2614        _openssl_assert(sign_result > 0)
2615
2616    def verify(self, key):
2617        """
2618        Verifies a signature on a certificate request.
2619
2620        :param PKey key: The public key that signature is supposedly from.
2621
2622        :return: ``True`` if the signature is correct.
2623        :rtype: bool
2624
2625        :raises OpenSSL.crypto.Error: If the signature is invalid, or there was
2626            a problem verifying the signature.
2627        """
2628        answer = _lib.NETSCAPE_SPKI_verify(self._spki, key._pkey)
2629        if answer <= 0:
2630            _raise_current_error()
2631        return True
2632
2633    def b64_encode(self):
2634        """
2635        Generate a base64 encoded representation of this SPKI object.
2636
2637        :return: The base64 encoded string.
2638        :rtype: :py:class:`bytes`
2639        """
2640        encoded = _lib.NETSCAPE_SPKI_b64_encode(self._spki)
2641        result = _ffi.string(encoded)
2642        _lib.OPENSSL_free(encoded)
2643        return result
2644
2645    def get_pubkey(self):
2646        """
2647        Get the public key of this certificate.
2648
2649        :return: The public key.
2650        :rtype: :py:class:`PKey`
2651        """
2652        pkey = PKey.__new__(PKey)
2653        pkey._pkey = _lib.NETSCAPE_SPKI_get_pubkey(self._spki)
2654        _openssl_assert(pkey._pkey != _ffi.NULL)
2655        pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free)
2656        pkey._only_public = True
2657        return pkey
2658
2659    def set_pubkey(self, pkey):
2660        """
2661        Set the public key of the certificate
2662
2663        :param pkey: The public key
2664        :return: ``None``
2665        """
2666        set_result = _lib.NETSCAPE_SPKI_set_pubkey(self._spki, pkey._pkey)
2667        _openssl_assert(set_result == 1)
2668
2669
2670NetscapeSPKIType = deprecated(
2671    NetscapeSPKI, __name__,
2672    "NetscapeSPKIType has been deprecated, use NetscapeSPKI instead",
2673    DeprecationWarning
2674)
2675
2676
2677class _PassphraseHelper(object):
2678    def __init__(self, type, passphrase, more_args=False, truncate=False):
2679        if type != FILETYPE_PEM and passphrase is not None:
2680            raise ValueError(
2681                "only FILETYPE_PEM key format supports encryption"
2682            )
2683        self._passphrase = passphrase
2684        self._more_args = more_args
2685        self._truncate = truncate
2686        self._problems = []
2687
2688    @property
2689    def callback(self):
2690        if self._passphrase is None:
2691            return _ffi.NULL
2692        elif isinstance(self._passphrase, bytes):
2693            return _ffi.NULL
2694        elif callable(self._passphrase):
2695            return _ffi.callback("pem_password_cb", self._read_passphrase)
2696        else:
2697            raise TypeError(
2698                "Last argument must be a byte string or a callable."
2699            )
2700
2701    @property
2702    def callback_args(self):
2703        if self._passphrase is None:
2704            return _ffi.NULL
2705        elif isinstance(self._passphrase, bytes):
2706            return self._passphrase
2707        elif callable(self._passphrase):
2708            return _ffi.NULL
2709        else:
2710            raise TypeError(
2711                "Last argument must be a byte string or a callable."
2712            )
2713
2714    def raise_if_problem(self, exceptionType=Error):
2715        if self._problems:
2716
2717            # Flush the OpenSSL error queue
2718            try:
2719                _exception_from_error_queue(exceptionType)
2720            except exceptionType:
2721                pass
2722
2723            raise self._problems.pop(0)
2724
2725    def _read_passphrase(self, buf, size, rwflag, userdata):
2726        try:
2727            if self._more_args:
2728                result = self._passphrase(size, rwflag, userdata)
2729            else:
2730                result = self._passphrase(rwflag)
2731            if not isinstance(result, bytes):
2732                raise ValueError("String expected")
2733            if len(result) > size:
2734                if self._truncate:
2735                    result = result[:size]
2736                else:
2737                    raise ValueError(
2738                        "passphrase returned by callback is too long"
2739                    )
2740            for i in range(len(result)):
2741                buf[i] = result[i:i + 1]
2742            return len(result)
2743        except Exception as e:
2744            self._problems.append(e)
2745            return 0
2746
2747
2748def load_publickey(type, buffer):
2749    """
2750    Load a public key from a buffer.
2751
2752    :param type: The file type (one of :data:`FILETYPE_PEM`,
2753        :data:`FILETYPE_ASN1`).
2754    :param buffer: The buffer the key is stored in.
2755    :type buffer: A Python string object, either unicode or bytestring.
2756    :return: The PKey object.
2757    :rtype: :class:`PKey`
2758    """
2759    if isinstance(buffer, _text_type):
2760        buffer = buffer.encode("ascii")
2761
2762    bio = _new_mem_buf(buffer)
2763
2764    if type == FILETYPE_PEM:
2765        evp_pkey = _lib.PEM_read_bio_PUBKEY(
2766            bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
2767    elif type == FILETYPE_ASN1:
2768        evp_pkey = _lib.d2i_PUBKEY_bio(bio, _ffi.NULL)
2769    else:
2770        raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2771
2772    if evp_pkey == _ffi.NULL:
2773        _raise_current_error()
2774
2775    pkey = PKey.__new__(PKey)
2776    pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free)
2777    pkey._only_public = True
2778    return pkey
2779
2780
2781def load_privatekey(type, buffer, passphrase=None):
2782    """
2783    Load a private key (PKey) from the string *buffer* encoded with the type
2784    *type*.
2785
2786    :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
2787    :param buffer: The buffer the key is stored in
2788    :param passphrase: (optional) if encrypted PEM format, this can be
2789                       either the passphrase to use, or a callback for
2790                       providing the passphrase.
2791
2792    :return: The PKey object
2793    """
2794    if isinstance(buffer, _text_type):
2795        buffer = buffer.encode("ascii")
2796
2797    bio = _new_mem_buf(buffer)
2798
2799    helper = _PassphraseHelper(type, passphrase)
2800    if type == FILETYPE_PEM:
2801        evp_pkey = _lib.PEM_read_bio_PrivateKey(
2802            bio, _ffi.NULL, helper.callback, helper.callback_args)
2803        helper.raise_if_problem()
2804    elif type == FILETYPE_ASN1:
2805        evp_pkey = _lib.d2i_PrivateKey_bio(bio, _ffi.NULL)
2806    else:
2807        raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2808
2809    if evp_pkey == _ffi.NULL:
2810        _raise_current_error()
2811
2812    pkey = PKey.__new__(PKey)
2813    pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free)
2814    return pkey
2815
2816
2817def dump_certificate_request(type, req):
2818    """
2819    Dump the certificate request *req* into a buffer string encoded with the
2820    type *type*.
2821
2822    :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
2823    :param req: The certificate request to dump
2824    :return: The buffer with the dumped certificate request in
2825    """
2826    bio = _new_mem_buf()
2827
2828    if type == FILETYPE_PEM:
2829        result_code = _lib.PEM_write_bio_X509_REQ(bio, req._req)
2830    elif type == FILETYPE_ASN1:
2831        result_code = _lib.i2d_X509_REQ_bio(bio, req._req)
2832    elif type == FILETYPE_TEXT:
2833        result_code = _lib.X509_REQ_print_ex(bio, req._req, 0, 0)
2834    else:
2835        raise ValueError(
2836            "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
2837            "FILETYPE_TEXT"
2838        )
2839
2840    _openssl_assert(result_code != 0)
2841
2842    return _bio_to_string(bio)
2843
2844
2845def load_certificate_request(type, buffer):
2846    """
2847    Load a certificate request (X509Req) from the string *buffer* encoded with
2848    the type *type*.
2849
2850    :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
2851    :param buffer: The buffer the certificate request is stored in
2852    :return: The X509Req object
2853    """
2854    if isinstance(buffer, _text_type):
2855        buffer = buffer.encode("ascii")
2856
2857    bio = _new_mem_buf(buffer)
2858
2859    if type == FILETYPE_PEM:
2860        req = _lib.PEM_read_bio_X509_REQ(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
2861    elif type == FILETYPE_ASN1:
2862        req = _lib.d2i_X509_REQ_bio(bio, _ffi.NULL)
2863    else:
2864        raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2865
2866    _openssl_assert(req != _ffi.NULL)
2867
2868    x509req = X509Req.__new__(X509Req)
2869    x509req._req = _ffi.gc(req, _lib.X509_REQ_free)
2870    return x509req
2871
2872
2873def sign(pkey, data, digest):
2874    """
2875    Sign a data string using the given key and message digest.
2876
2877    :param pkey: PKey to sign with
2878    :param data: data to be signed
2879    :param digest: message digest to use
2880    :return: signature
2881
2882    .. versionadded:: 0.11
2883    """
2884    data = _text_to_bytes_and_warn("data", data)
2885
2886    digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
2887    if digest_obj == _ffi.NULL:
2888        raise ValueError("No such digest method")
2889
2890    md_ctx = _lib.Cryptography_EVP_MD_CTX_new()
2891    md_ctx = _ffi.gc(md_ctx, _lib.Cryptography_EVP_MD_CTX_free)
2892
2893    _lib.EVP_SignInit(md_ctx, digest_obj)
2894    _lib.EVP_SignUpdate(md_ctx, data, len(data))
2895
2896    length = _lib.EVP_PKEY_size(pkey._pkey)
2897    _openssl_assert(length > 0)
2898    signature_buffer = _ffi.new("unsigned char[]", length)
2899    signature_length = _ffi.new("unsigned int *")
2900    final_result = _lib.EVP_SignFinal(
2901        md_ctx, signature_buffer, signature_length, pkey._pkey)
2902    _openssl_assert(final_result == 1)
2903
2904    return _ffi.buffer(signature_buffer, signature_length[0])[:]
2905
2906
2907def verify(cert, signature, data, digest):
2908    """
2909    Verify the signature for a data string.
2910
2911    :param cert: signing certificate (X509 object) corresponding to the
2912        private key which generated the signature.
2913    :param signature: signature returned by sign function
2914    :param data: data to be verified
2915    :param digest: message digest to use
2916    :return: ``None`` if the signature is correct, raise exception otherwise.
2917
2918    .. versionadded:: 0.11
2919    """
2920    data = _text_to_bytes_and_warn("data", data)
2921
2922    digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest))
2923    if digest_obj == _ffi.NULL:
2924        raise ValueError("No such digest method")
2925
2926    pkey = _lib.X509_get_pubkey(cert._x509)
2927    _openssl_assert(pkey != _ffi.NULL)
2928    pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free)
2929
2930    md_ctx = _lib.Cryptography_EVP_MD_CTX_new()
2931    md_ctx = _ffi.gc(md_ctx, _lib.Cryptography_EVP_MD_CTX_free)
2932
2933    _lib.EVP_VerifyInit(md_ctx, digest_obj)
2934    _lib.EVP_VerifyUpdate(md_ctx, data, len(data))
2935    verify_result = _lib.EVP_VerifyFinal(
2936        md_ctx, signature, len(signature), pkey
2937    )
2938
2939    if verify_result != 1:
2940        _raise_current_error()
2941
2942
2943def dump_crl(type, crl):
2944    """
2945    Dump a certificate revocation list to a buffer.
2946
2947    :param type: The file type (one of ``FILETYPE_PEM``, ``FILETYPE_ASN1``, or
2948        ``FILETYPE_TEXT``).
2949    :param CRL crl: The CRL to dump.
2950
2951    :return: The buffer with the CRL.
2952    :rtype: bytes
2953    """
2954    bio = _new_mem_buf()
2955
2956    if type == FILETYPE_PEM:
2957        ret = _lib.PEM_write_bio_X509_CRL(bio, crl._crl)
2958    elif type == FILETYPE_ASN1:
2959        ret = _lib.i2d_X509_CRL_bio(bio, crl._crl)
2960    elif type == FILETYPE_TEXT:
2961        ret = _lib.X509_CRL_print(bio, crl._crl)
2962    else:
2963        raise ValueError(
2964            "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or "
2965            "FILETYPE_TEXT")
2966
2967    assert ret == 1
2968    return _bio_to_string(bio)
2969
2970
2971def load_crl(type, buffer):
2972    """
2973    Load Certificate Revocation List (CRL) data from a string *buffer*.
2974    *buffer* encoded with the type *type*.
2975
2976    :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1)
2977    :param buffer: The buffer the CRL is stored in
2978
2979    :return: The PKey object
2980    """
2981    if isinstance(buffer, _text_type):
2982        buffer = buffer.encode("ascii")
2983
2984    bio = _new_mem_buf(buffer)
2985
2986    if type == FILETYPE_PEM:
2987        crl = _lib.PEM_read_bio_X509_CRL(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
2988    elif type == FILETYPE_ASN1:
2989        crl = _lib.d2i_X509_CRL_bio(bio, _ffi.NULL)
2990    else:
2991        raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
2992
2993    if crl == _ffi.NULL:
2994        _raise_current_error()
2995
2996    result = CRL.__new__(CRL)
2997    result._crl = _ffi.gc(crl, _lib.X509_CRL_free)
2998    return result
2999
3000
3001def load_pkcs7_data(type, buffer):
3002    """
3003    Load pkcs7 data from the string *buffer* encoded with the type
3004    *type*.
3005
3006    :param type: The file type (one of FILETYPE_PEM or FILETYPE_ASN1)
3007    :param buffer: The buffer with the pkcs7 data.
3008    :return: The PKCS7 object
3009    """
3010    if isinstance(buffer, _text_type):
3011        buffer = buffer.encode("ascii")
3012
3013    bio = _new_mem_buf(buffer)
3014
3015    if type == FILETYPE_PEM:
3016        pkcs7 = _lib.PEM_read_bio_PKCS7(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL)
3017    elif type == FILETYPE_ASN1:
3018        pkcs7 = _lib.d2i_PKCS7_bio(bio, _ffi.NULL)
3019    else:
3020        raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1")
3021
3022    if pkcs7 == _ffi.NULL:
3023        _raise_current_error()
3024
3025    pypkcs7 = PKCS7.__new__(PKCS7)
3026    pypkcs7._pkcs7 = _ffi.gc(pkcs7, _lib.PKCS7_free)
3027    return pypkcs7
3028
3029
3030def load_pkcs12(buffer, passphrase=None):
3031    """
3032    Load pkcs12 data from the string *buffer*. If the pkcs12 structure is
3033    encrypted, a *passphrase* must be included.  The MAC is always
3034    checked and thus required.
3035
3036    See also the man page for the C function :py:func:`PKCS12_parse`.
3037
3038    :param buffer: The buffer the certificate is stored in
3039    :param passphrase: (Optional) The password to decrypt the PKCS12 lump
3040    :returns: The PKCS12 object
3041    """
3042    passphrase = _text_to_bytes_and_warn("passphrase", passphrase)
3043
3044    if isinstance(buffer, _text_type):
3045        buffer = buffer.encode("ascii")
3046
3047    bio = _new_mem_buf(buffer)
3048
3049    # Use null passphrase if passphrase is None or empty string. With PKCS#12
3050    # password based encryption no password and a zero length password are two
3051    # different things, but OpenSSL implementation will try both to figure out
3052    # which one works.
3053    if not passphrase:
3054        passphrase = _ffi.NULL
3055
3056    p12 = _lib.d2i_PKCS12_bio(bio, _ffi.NULL)
3057    if p12 == _ffi.NULL:
3058        _raise_current_error()
3059    p12 = _ffi.gc(p12, _lib.PKCS12_free)
3060
3061    pkey = _ffi.new("EVP_PKEY**")
3062    cert = _ffi.new("X509**")
3063    cacerts = _ffi.new("Cryptography_STACK_OF_X509**")
3064
3065    parse_result = _lib.PKCS12_parse(p12, passphrase, pkey, cert, cacerts)
3066    if not parse_result:
3067        _raise_current_error()
3068
3069    cacerts = _ffi.gc(cacerts[0], _lib.sk_X509_free)
3070
3071    # openssl 1.0.0 sometimes leaves an X509_check_private_key error in the
3072    # queue for no particular reason.  This error isn't interesting to anyone
3073    # outside this function.  It's not even interesting to us.  Get rid of it.
3074    try:
3075        _raise_current_error()
3076    except Error:
3077        pass
3078
3079    if pkey[0] == _ffi.NULL:
3080        pykey = None
3081    else:
3082        pykey = PKey.__new__(PKey)
3083        pykey._pkey = _ffi.gc(pkey[0], _lib.EVP_PKEY_free)
3084
3085    if cert[0] == _ffi.NULL:
3086        pycert = None
3087        friendlyname = None
3088    else:
3089        pycert = X509._from_raw_x509_ptr(cert[0])
3090
3091        friendlyname_length = _ffi.new("int*")
3092        friendlyname_buffer = _lib.X509_alias_get0(
3093            cert[0], friendlyname_length
3094        )
3095        friendlyname = _ffi.buffer(
3096            friendlyname_buffer, friendlyname_length[0]
3097        )[:]
3098        if friendlyname_buffer == _ffi.NULL:
3099            friendlyname = None
3100
3101    pycacerts = []
3102    for i in range(_lib.sk_X509_num(cacerts)):
3103        x509 = _lib.sk_X509_value(cacerts, i)
3104        pycacert = X509._from_raw_x509_ptr(x509)
3105        pycacerts.append(pycacert)
3106    if not pycacerts:
3107        pycacerts = None
3108
3109    pkcs12 = PKCS12.__new__(PKCS12)
3110    pkcs12._pkey = pykey
3111    pkcs12._cert = pycert
3112    pkcs12._cacerts = pycacerts
3113    pkcs12._friendlyname = friendlyname
3114    return pkcs12
3115
3116
3117# There are no direct unit tests for this initialization.  It is tested
3118# indirectly since it is necessary for functions like dump_privatekey when
3119# using encryption.
3120#
3121# Thus OpenSSL.test.test_crypto.FunctionTests.test_dump_privatekey_passphrase
3122# and some other similar tests may fail without this (though they may not if
3123# the Python runtime has already done some initialization of the underlying
3124# OpenSSL library (and is linked against the same one that cryptography is
3125# using)).
3126_lib.OpenSSL_add_all_algorithms()
3127
3128# This is similar but exercised mainly by exception_from_error_queue.  It calls
3129# both ERR_load_crypto_strings() and ERR_load_SSL_strings().
3130_lib.SSL_load_error_strings()
3131
3132
3133# Set the default string mask to match OpenSSL upstream (since 2005) and
3134# RFC5280 recommendations.
3135_lib.ASN1_STRING_set_default_mask_asc(b'utf8only')
3136