1import datetime 2 3from base64 import b16encode 4from functools import partial 5from operator import __eq__, __ne__, __lt__, __le__, __gt__, __ge__ 6 7from six import ( 8 integer_types as _integer_types, 9 text_type as _text_type, 10 PY3 as _PY3) 11 12from cryptography import x509 13from cryptography.hazmat.primitives.asymmetric import dsa, rsa 14from cryptography.utils import deprecated 15 16from OpenSSL._util import ( 17 ffi as _ffi, 18 lib as _lib, 19 exception_from_error_queue as _exception_from_error_queue, 20 byte_string as _byte_string, 21 native as _native, 22 UNSPECIFIED as _UNSPECIFIED, 23 text_to_bytes_and_warn as _text_to_bytes_and_warn, 24 make_assert as _make_assert, 25) 26 27__all__ = [ 28 'FILETYPE_PEM', 29 'FILETYPE_ASN1', 30 'FILETYPE_TEXT', 31 'TYPE_RSA', 32 'TYPE_DSA', 33 'Error', 34 'PKey', 35 'get_elliptic_curves', 36 'get_elliptic_curve', 37 'X509Name', 38 'X509Extension', 39 'X509Req', 40 'X509', 41 'X509StoreFlags', 42 'X509Store', 43 'X509StoreContextError', 44 'X509StoreContext', 45 'load_certificate', 46 'dump_certificate', 47 'dump_publickey', 48 'dump_privatekey', 49 'Revoked', 50 'CRL', 51 'PKCS7', 52 'PKCS12', 53 'NetscapeSPKI', 54 'load_publickey', 55 'load_privatekey', 56 'dump_certificate_request', 57 'load_certificate_request', 58 'sign', 59 'verify', 60 'dump_crl', 61 'load_crl', 62 'load_pkcs7_data', 63 'load_pkcs12' 64] 65 66FILETYPE_PEM = _lib.SSL_FILETYPE_PEM 67FILETYPE_ASN1 = _lib.SSL_FILETYPE_ASN1 68 69# TODO This was an API mistake. OpenSSL has no such constant. 70FILETYPE_TEXT = 2 ** 16 - 1 71 72TYPE_RSA = _lib.EVP_PKEY_RSA 73TYPE_DSA = _lib.EVP_PKEY_DSA 74TYPE_DH = _lib.EVP_PKEY_DH 75TYPE_EC = _lib.EVP_PKEY_EC 76 77 78class Error(Exception): 79 """ 80 An error occurred in an `OpenSSL.crypto` API. 81 """ 82 83 84_raise_current_error = partial(_exception_from_error_queue, Error) 85_openssl_assert = _make_assert(Error) 86 87 88def _get_backend(): 89 """ 90 Importing the backend from cryptography has the side effect of activating 91 the osrandom engine. This mutates the global state of OpenSSL in the 92 process and causes issues for various programs that use subinterpreters or 93 embed Python. By putting the import in this function we can avoid 94 triggering this side effect unless _get_backend is called. 95 """ 96 from cryptography.hazmat.backends.openssl.backend import backend 97 return backend 98 99 100def _untested_error(where): 101 """ 102 An OpenSSL API failed somehow. Additionally, the failure which was 103 encountered isn't one that's exercised by the test suite so future behavior 104 of pyOpenSSL is now somewhat less predictable. 105 """ 106 raise RuntimeError("Unknown %s failure" % (where,)) 107 108 109def _new_mem_buf(buffer=None): 110 """ 111 Allocate a new OpenSSL memory BIO. 112 113 Arrange for the garbage collector to clean it up automatically. 114 115 :param buffer: None or some bytes to use to put into the BIO so that they 116 can be read out. 117 """ 118 if buffer is None: 119 bio = _lib.BIO_new(_lib.BIO_s_mem()) 120 free = _lib.BIO_free 121 else: 122 data = _ffi.new("char[]", buffer) 123 bio = _lib.BIO_new_mem_buf(data, len(buffer)) 124 125 # Keep the memory alive as long as the bio is alive! 126 def free(bio, ref=data): 127 return _lib.BIO_free(bio) 128 129 _openssl_assert(bio != _ffi.NULL) 130 131 bio = _ffi.gc(bio, free) 132 return bio 133 134 135def _bio_to_string(bio): 136 """ 137 Copy the contents of an OpenSSL BIO object into a Python byte string. 138 """ 139 result_buffer = _ffi.new('char**') 140 buffer_length = _lib.BIO_get_mem_data(bio, result_buffer) 141 return _ffi.buffer(result_buffer[0], buffer_length)[:] 142 143 144def _set_asn1_time(boundary, when): 145 """ 146 The the time value of an ASN1 time object. 147 148 @param boundary: An ASN1_TIME pointer (or an object safely 149 castable to that type) which will have its value set. 150 @param when: A string representation of the desired time value. 151 152 @raise TypeError: If C{when} is not a L{bytes} string. 153 @raise ValueError: If C{when} does not represent a time in the required 154 format. 155 @raise RuntimeError: If the time value cannot be set for some other 156 (unspecified) reason. 157 """ 158 if not isinstance(when, bytes): 159 raise TypeError("when must be a byte string") 160 161 set_result = _lib.ASN1_TIME_set_string(boundary, when) 162 if set_result == 0: 163 raise ValueError("Invalid string") 164 165 166def _get_asn1_time(timestamp): 167 """ 168 Retrieve the time value of an ASN1 time object. 169 170 @param timestamp: An ASN1_GENERALIZEDTIME* (or an object safely castable to 171 that type) from which the time value will be retrieved. 172 173 @return: The time value from C{timestamp} as a L{bytes} string in a certain 174 format. Or C{None} if the object contains no time value. 175 """ 176 string_timestamp = _ffi.cast('ASN1_STRING*', timestamp) 177 if _lib.ASN1_STRING_length(string_timestamp) == 0: 178 return None 179 elif ( 180 _lib.ASN1_STRING_type(string_timestamp) == _lib.V_ASN1_GENERALIZEDTIME 181 ): 182 return _ffi.string(_lib.ASN1_STRING_data(string_timestamp)) 183 else: 184 generalized_timestamp = _ffi.new("ASN1_GENERALIZEDTIME**") 185 _lib.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp) 186 if generalized_timestamp[0] == _ffi.NULL: 187 # This may happen: 188 # - if timestamp was not an ASN1_TIME 189 # - if allocating memory for the ASN1_GENERALIZEDTIME failed 190 # - if a copy of the time data from timestamp cannot be made for 191 # the newly allocated ASN1_GENERALIZEDTIME 192 # 193 # These are difficult to test. cffi enforces the ASN1_TIME type. 194 # Memory allocation failures are a pain to trigger 195 # deterministically. 196 _untested_error("ASN1_TIME_to_generalizedtime") 197 else: 198 string_timestamp = _ffi.cast( 199 "ASN1_STRING*", generalized_timestamp[0]) 200 string_data = _lib.ASN1_STRING_data(string_timestamp) 201 string_result = _ffi.string(string_data) 202 _lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0]) 203 return string_result 204 205 206class _X509NameInvalidator(object): 207 def __init__(self): 208 self._names = [] 209 210 def add(self, name): 211 self._names.append(name) 212 213 def clear(self): 214 for name in self._names: 215 # Breaks the object, but also prevents UAF! 216 del name._name 217 218 219class PKey(object): 220 """ 221 A class representing an DSA or RSA public key or key pair. 222 """ 223 _only_public = False 224 _initialized = True 225 226 def __init__(self): 227 pkey = _lib.EVP_PKEY_new() 228 self._pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free) 229 self._initialized = False 230 231 def to_cryptography_key(self): 232 """ 233 Export as a ``cryptography`` key. 234 235 :rtype: One of ``cryptography``'s `key interfaces`_. 236 237 .. _key interfaces: https://cryptography.io/en/latest/hazmat/\ 238 primitives/asymmetric/rsa/#key-interfaces 239 240 .. versionadded:: 16.1.0 241 """ 242 backend = _get_backend() 243 if self._only_public: 244 return backend._evp_pkey_to_public_key(self._pkey) 245 else: 246 return backend._evp_pkey_to_private_key(self._pkey) 247 248 @classmethod 249 def from_cryptography_key(cls, crypto_key): 250 """ 251 Construct based on a ``cryptography`` *crypto_key*. 252 253 :param crypto_key: A ``cryptography`` key. 254 :type crypto_key: One of ``cryptography``'s `key interfaces`_. 255 256 :rtype: PKey 257 258 .. versionadded:: 16.1.0 259 """ 260 pkey = cls() 261 if not isinstance(crypto_key, (rsa.RSAPublicKey, rsa.RSAPrivateKey, 262 dsa.DSAPublicKey, dsa.DSAPrivateKey)): 263 raise TypeError("Unsupported key type") 264 265 pkey._pkey = crypto_key._evp_pkey 266 if isinstance(crypto_key, (rsa.RSAPublicKey, dsa.DSAPublicKey)): 267 pkey._only_public = True 268 pkey._initialized = True 269 return pkey 270 271 def generate_key(self, type, bits): 272 """ 273 Generate a key pair of the given type, with the given number of bits. 274 275 This generates a key "into" the this object. 276 277 :param type: The key type. 278 :type type: :py:data:`TYPE_RSA` or :py:data:`TYPE_DSA` 279 :param bits: The number of bits. 280 :type bits: :py:data:`int` ``>= 0`` 281 :raises TypeError: If :py:data:`type` or :py:data:`bits` isn't 282 of the appropriate type. 283 :raises ValueError: If the number of bits isn't an integer of 284 the appropriate size. 285 :return: ``None`` 286 """ 287 if not isinstance(type, int): 288 raise TypeError("type must be an integer") 289 290 if not isinstance(bits, int): 291 raise TypeError("bits must be an integer") 292 293 if type == TYPE_RSA: 294 if bits <= 0: 295 raise ValueError("Invalid number of bits") 296 297 # TODO Check error return 298 exponent = _lib.BN_new() 299 exponent = _ffi.gc(exponent, _lib.BN_free) 300 _lib.BN_set_word(exponent, _lib.RSA_F4) 301 302 rsa = _lib.RSA_new() 303 304 result = _lib.RSA_generate_key_ex(rsa, bits, exponent, _ffi.NULL) 305 _openssl_assert(result == 1) 306 307 result = _lib.EVP_PKEY_assign_RSA(self._pkey, rsa) 308 _openssl_assert(result == 1) 309 310 elif type == TYPE_DSA: 311 dsa = _lib.DSA_new() 312 _openssl_assert(dsa != _ffi.NULL) 313 314 dsa = _ffi.gc(dsa, _lib.DSA_free) 315 res = _lib.DSA_generate_parameters_ex( 316 dsa, bits, _ffi.NULL, 0, _ffi.NULL, _ffi.NULL, _ffi.NULL 317 ) 318 _openssl_assert(res == 1) 319 320 _openssl_assert(_lib.DSA_generate_key(dsa) == 1) 321 _openssl_assert(_lib.EVP_PKEY_set1_DSA(self._pkey, dsa) == 1) 322 else: 323 raise Error("No such key type") 324 325 self._initialized = True 326 327 def check(self): 328 """ 329 Check the consistency of an RSA private key. 330 331 This is the Python equivalent of OpenSSL's ``RSA_check_key``. 332 333 :return: ``True`` if key is consistent. 334 335 :raise OpenSSL.crypto.Error: if the key is inconsistent. 336 337 :raise TypeError: if the key is of a type which cannot be checked. 338 Only RSA keys can currently be checked. 339 """ 340 if self._only_public: 341 raise TypeError("public key only") 342 343 if _lib.EVP_PKEY_type(self.type()) != _lib.EVP_PKEY_RSA: 344 raise TypeError("key type unsupported") 345 346 rsa = _lib.EVP_PKEY_get1_RSA(self._pkey) 347 rsa = _ffi.gc(rsa, _lib.RSA_free) 348 result = _lib.RSA_check_key(rsa) 349 if result: 350 return True 351 _raise_current_error() 352 353 def type(self): 354 """ 355 Returns the type of the key 356 357 :return: The type of the key. 358 """ 359 return _lib.EVP_PKEY_id(self._pkey) 360 361 def bits(self): 362 """ 363 Returns the number of bits of the key 364 365 :return: The number of bits of the key. 366 """ 367 return _lib.EVP_PKEY_bits(self._pkey) 368 369 370PKeyType = deprecated( 371 PKey, __name__, 372 "PKeyType has been deprecated, use PKey instead", 373 DeprecationWarning 374) 375 376 377class _EllipticCurve(object): 378 """ 379 A representation of a supported elliptic curve. 380 381 @cvar _curves: :py:obj:`None` until an attempt is made to load the curves. 382 Thereafter, a :py:type:`set` containing :py:type:`_EllipticCurve` 383 instances each of which represents one curve supported by the system. 384 @type _curves: :py:type:`NoneType` or :py:type:`set` 385 """ 386 _curves = None 387 388 if _PY3: 389 # This only necessary on Python 3. Morever, it is broken on Python 2. 390 def __ne__(self, other): 391 """ 392 Implement cooperation with the right-hand side argument of ``!=``. 393 394 Python 3 seems to have dropped this cooperation in this very narrow 395 circumstance. 396 """ 397 if isinstance(other, _EllipticCurve): 398 return super(_EllipticCurve, self).__ne__(other) 399 return NotImplemented 400 401 @classmethod 402 def _load_elliptic_curves(cls, lib): 403 """ 404 Get the curves supported by OpenSSL. 405 406 :param lib: The OpenSSL library binding object. 407 408 :return: A :py:type:`set` of ``cls`` instances giving the names of the 409 elliptic curves the underlying library supports. 410 """ 411 num_curves = lib.EC_get_builtin_curves(_ffi.NULL, 0) 412 builtin_curves = _ffi.new('EC_builtin_curve[]', num_curves) 413 # The return value on this call should be num_curves again. We 414 # could check it to make sure but if it *isn't* then.. what could 415 # we do? Abort the whole process, I suppose...? -exarkun 416 lib.EC_get_builtin_curves(builtin_curves, num_curves) 417 return set( 418 cls.from_nid(lib, c.nid) 419 for c in builtin_curves) 420 421 @classmethod 422 def _get_elliptic_curves(cls, lib): 423 """ 424 Get, cache, and return the curves supported by OpenSSL. 425 426 :param lib: The OpenSSL library binding object. 427 428 :return: A :py:type:`set` of ``cls`` instances giving the names of the 429 elliptic curves the underlying library supports. 430 """ 431 if cls._curves is None: 432 cls._curves = cls._load_elliptic_curves(lib) 433 return cls._curves 434 435 @classmethod 436 def from_nid(cls, lib, nid): 437 """ 438 Instantiate a new :py:class:`_EllipticCurve` associated with the given 439 OpenSSL NID. 440 441 :param lib: The OpenSSL library binding object. 442 443 :param nid: The OpenSSL NID the resulting curve object will represent. 444 This must be a curve NID (and not, for example, a hash NID) or 445 subsequent operations will fail in unpredictable ways. 446 :type nid: :py:class:`int` 447 448 :return: The curve object. 449 """ 450 return cls(lib, nid, _ffi.string(lib.OBJ_nid2sn(nid)).decode("ascii")) 451 452 def __init__(self, lib, nid, name): 453 """ 454 :param _lib: The :py:mod:`cryptography` binding instance used to 455 interface with OpenSSL. 456 457 :param _nid: The OpenSSL NID identifying the curve this object 458 represents. 459 :type _nid: :py:class:`int` 460 461 :param name: The OpenSSL short name identifying the curve this object 462 represents. 463 :type name: :py:class:`unicode` 464 """ 465 self._lib = lib 466 self._nid = nid 467 self.name = name 468 469 def __repr__(self): 470 return "<Curve %r>" % (self.name,) 471 472 def _to_EC_KEY(self): 473 """ 474 Create a new OpenSSL EC_KEY structure initialized to use this curve. 475 476 The structure is automatically garbage collected when the Python object 477 is garbage collected. 478 """ 479 key = self._lib.EC_KEY_new_by_curve_name(self._nid) 480 return _ffi.gc(key, _lib.EC_KEY_free) 481 482 483def get_elliptic_curves(): 484 """ 485 Return a set of objects representing the elliptic curves supported in the 486 OpenSSL build in use. 487 488 The curve objects have a :py:class:`unicode` ``name`` attribute by which 489 they identify themselves. 490 491 The curve objects are useful as values for the argument accepted by 492 :py:meth:`Context.set_tmp_ecdh` to specify which elliptical curve should be 493 used for ECDHE key exchange. 494 """ 495 return _EllipticCurve._get_elliptic_curves(_lib) 496 497 498def get_elliptic_curve(name): 499 """ 500 Return a single curve object selected by name. 501 502 See :py:func:`get_elliptic_curves` for information about curve objects. 503 504 :param name: The OpenSSL short name identifying the curve object to 505 retrieve. 506 :type name: :py:class:`unicode` 507 508 If the named curve is not supported then :py:class:`ValueError` is raised. 509 """ 510 for curve in get_elliptic_curves(): 511 if curve.name == name: 512 return curve 513 raise ValueError("unknown curve name", name) 514 515 516class X509Name(object): 517 """ 518 An X.509 Distinguished Name. 519 520 :ivar countryName: The country of the entity. 521 :ivar C: Alias for :py:attr:`countryName`. 522 523 :ivar stateOrProvinceName: The state or province of the entity. 524 :ivar ST: Alias for :py:attr:`stateOrProvinceName`. 525 526 :ivar localityName: The locality of the entity. 527 :ivar L: Alias for :py:attr:`localityName`. 528 529 :ivar organizationName: The organization name of the entity. 530 :ivar O: Alias for :py:attr:`organizationName`. 531 532 :ivar organizationalUnitName: The organizational unit of the entity. 533 :ivar OU: Alias for :py:attr:`organizationalUnitName` 534 535 :ivar commonName: The common name of the entity. 536 :ivar CN: Alias for :py:attr:`commonName`. 537 538 :ivar emailAddress: The e-mail address of the entity. 539 """ 540 541 def __init__(self, name): 542 """ 543 Create a new X509Name, copying the given X509Name instance. 544 545 :param name: The name to copy. 546 :type name: :py:class:`X509Name` 547 """ 548 name = _lib.X509_NAME_dup(name._name) 549 self._name = _ffi.gc(name, _lib.X509_NAME_free) 550 551 def __setattr__(self, name, value): 552 if name.startswith('_'): 553 return super(X509Name, self).__setattr__(name, value) 554 555 # Note: we really do not want str subclasses here, so we do not use 556 # isinstance. 557 if type(name) is not str: 558 raise TypeError("attribute name must be string, not '%.200s'" % ( 559 type(value).__name__,)) 560 561 nid = _lib.OBJ_txt2nid(_byte_string(name)) 562 if nid == _lib.NID_undef: 563 try: 564 _raise_current_error() 565 except Error: 566 pass 567 raise AttributeError("No such attribute") 568 569 # If there's an old entry for this NID, remove it 570 for i in range(_lib.X509_NAME_entry_count(self._name)): 571 ent = _lib.X509_NAME_get_entry(self._name, i) 572 ent_obj = _lib.X509_NAME_ENTRY_get_object(ent) 573 ent_nid = _lib.OBJ_obj2nid(ent_obj) 574 if nid == ent_nid: 575 ent = _lib.X509_NAME_delete_entry(self._name, i) 576 _lib.X509_NAME_ENTRY_free(ent) 577 break 578 579 if isinstance(value, _text_type): 580 value = value.encode('utf-8') 581 582 add_result = _lib.X509_NAME_add_entry_by_NID( 583 self._name, nid, _lib.MBSTRING_UTF8, value, -1, -1, 0) 584 if not add_result: 585 _raise_current_error() 586 587 def __getattr__(self, name): 588 """ 589 Find attribute. An X509Name object has the following attributes: 590 countryName (alias C), stateOrProvince (alias ST), locality (alias L), 591 organization (alias O), organizationalUnit (alias OU), commonName 592 (alias CN) and more... 593 """ 594 nid = _lib.OBJ_txt2nid(_byte_string(name)) 595 if nid == _lib.NID_undef: 596 # This is a bit weird. OBJ_txt2nid indicated failure, but it seems 597 # a lower level function, a2d_ASN1_OBJECT, also feels the need to 598 # push something onto the error queue. If we don't clean that up 599 # now, someone else will bump into it later and be quite confused. 600 # See lp#314814. 601 try: 602 _raise_current_error() 603 except Error: 604 pass 605 return super(X509Name, self).__getattr__(name) 606 607 entry_index = _lib.X509_NAME_get_index_by_NID(self._name, nid, -1) 608 if entry_index == -1: 609 return None 610 611 entry = _lib.X509_NAME_get_entry(self._name, entry_index) 612 data = _lib.X509_NAME_ENTRY_get_data(entry) 613 614 result_buffer = _ffi.new("unsigned char**") 615 data_length = _lib.ASN1_STRING_to_UTF8(result_buffer, data) 616 _openssl_assert(data_length >= 0) 617 618 try: 619 result = _ffi.buffer( 620 result_buffer[0], data_length 621 )[:].decode('utf-8') 622 finally: 623 # XXX untested 624 _lib.OPENSSL_free(result_buffer[0]) 625 return result 626 627 def _cmp(op): 628 def f(self, other): 629 if not isinstance(other, X509Name): 630 return NotImplemented 631 result = _lib.X509_NAME_cmp(self._name, other._name) 632 return op(result, 0) 633 return f 634 635 __eq__ = _cmp(__eq__) 636 __ne__ = _cmp(__ne__) 637 638 __lt__ = _cmp(__lt__) 639 __le__ = _cmp(__le__) 640 641 __gt__ = _cmp(__gt__) 642 __ge__ = _cmp(__ge__) 643 644 def __repr__(self): 645 """ 646 String representation of an X509Name 647 """ 648 result_buffer = _ffi.new("char[]", 512) 649 format_result = _lib.X509_NAME_oneline( 650 self._name, result_buffer, len(result_buffer)) 651 _openssl_assert(format_result != _ffi.NULL) 652 653 return "<X509Name object '%s'>" % ( 654 _native(_ffi.string(result_buffer)),) 655 656 def hash(self): 657 """ 658 Return an integer representation of the first four bytes of the 659 MD5 digest of the DER representation of the name. 660 661 This is the Python equivalent of OpenSSL's ``X509_NAME_hash``. 662 663 :return: The (integer) hash of this name. 664 :rtype: :py:class:`int` 665 """ 666 return _lib.X509_NAME_hash(self._name) 667 668 def der(self): 669 """ 670 Return the DER encoding of this name. 671 672 :return: The DER encoded form of this name. 673 :rtype: :py:class:`bytes` 674 """ 675 result_buffer = _ffi.new('unsigned char**') 676 encode_result = _lib.i2d_X509_NAME(self._name, result_buffer) 677 _openssl_assert(encode_result >= 0) 678 679 string_result = _ffi.buffer(result_buffer[0], encode_result)[:] 680 _lib.OPENSSL_free(result_buffer[0]) 681 return string_result 682 683 def get_components(self): 684 """ 685 Returns the components of this name, as a sequence of 2-tuples. 686 687 :return: The components of this name. 688 :rtype: :py:class:`list` of ``name, value`` tuples. 689 """ 690 result = [] 691 for i in range(_lib.X509_NAME_entry_count(self._name)): 692 ent = _lib.X509_NAME_get_entry(self._name, i) 693 694 fname = _lib.X509_NAME_ENTRY_get_object(ent) 695 fval = _lib.X509_NAME_ENTRY_get_data(ent) 696 697 nid = _lib.OBJ_obj2nid(fname) 698 name = _lib.OBJ_nid2sn(nid) 699 700 # ffi.string does not handle strings containing NULL bytes 701 # (which may have been generated by old, broken software) 702 value = _ffi.buffer(_lib.ASN1_STRING_data(fval), 703 _lib.ASN1_STRING_length(fval))[:] 704 result.append((_ffi.string(name), value)) 705 706 return result 707 708 709X509NameType = deprecated( 710 X509Name, __name__, 711 "X509NameType has been deprecated, use X509Name instead", 712 DeprecationWarning 713) 714 715 716class X509Extension(object): 717 """ 718 An X.509 v3 certificate extension. 719 """ 720 721 def __init__(self, type_name, critical, value, subject=None, issuer=None): 722 """ 723 Initializes an X509 extension. 724 725 :param type_name: The name of the type of extension_ to create. 726 :type type_name: :py:data:`bytes` 727 728 :param bool critical: A flag indicating whether this is a critical 729 extension. 730 731 :param value: The value of the extension. 732 :type value: :py:data:`bytes` 733 734 :param subject: Optional X509 certificate to use as subject. 735 :type subject: :py:class:`X509` 736 737 :param issuer: Optional X509 certificate to use as issuer. 738 :type issuer: :py:class:`X509` 739 740 .. _extension: https://www.openssl.org/docs/manmaster/man5/ 741 x509v3_config.html#STANDARD-EXTENSIONS 742 """ 743 ctx = _ffi.new("X509V3_CTX*") 744 745 # A context is necessary for any extension which uses the r2i 746 # conversion method. That is, X509V3_EXT_nconf may segfault if passed 747 # a NULL ctx. Start off by initializing most of the fields to NULL. 748 _lib.X509V3_set_ctx(ctx, _ffi.NULL, _ffi.NULL, _ffi.NULL, _ffi.NULL, 0) 749 750 # We have no configuration database - but perhaps we should (some 751 # extensions may require it). 752 _lib.X509V3_set_ctx_nodb(ctx) 753 754 # Initialize the subject and issuer, if appropriate. ctx is a local, 755 # and as far as I can tell none of the X509V3_* APIs invoked here steal 756 # any references, so no need to mess with reference counts or 757 # duplicates. 758 if issuer is not None: 759 if not isinstance(issuer, X509): 760 raise TypeError("issuer must be an X509 instance") 761 ctx.issuer_cert = issuer._x509 762 if subject is not None: 763 if not isinstance(subject, X509): 764 raise TypeError("subject must be an X509 instance") 765 ctx.subject_cert = subject._x509 766 767 if critical: 768 # There are other OpenSSL APIs which would let us pass in critical 769 # separately, but they're harder to use, and since value is already 770 # a pile of crappy junk smuggling a ton of utterly important 771 # structured data, what's the point of trying to avoid nasty stuff 772 # with strings? (However, X509V3_EXT_i2d in particular seems like 773 # it would be a better API to invoke. I do not know where to get 774 # the ext_struc it desires for its last parameter, though.) 775 value = b"critical," + value 776 777 extension = _lib.X509V3_EXT_nconf(_ffi.NULL, ctx, type_name, value) 778 if extension == _ffi.NULL: 779 _raise_current_error() 780 self._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free) 781 782 @property 783 def _nid(self): 784 return _lib.OBJ_obj2nid( 785 _lib.X509_EXTENSION_get_object(self._extension) 786 ) 787 788 _prefixes = { 789 _lib.GEN_EMAIL: "email", 790 _lib.GEN_DNS: "DNS", 791 _lib.GEN_URI: "URI", 792 } 793 794 def _subjectAltNameString(self): 795 names = _ffi.cast( 796 "GENERAL_NAMES*", _lib.X509V3_EXT_d2i(self._extension) 797 ) 798 799 names = _ffi.gc(names, _lib.GENERAL_NAMES_free) 800 parts = [] 801 for i in range(_lib.sk_GENERAL_NAME_num(names)): 802 name = _lib.sk_GENERAL_NAME_value(names, i) 803 try: 804 label = self._prefixes[name.type] 805 except KeyError: 806 bio = _new_mem_buf() 807 _lib.GENERAL_NAME_print(bio, name) 808 parts.append(_native(_bio_to_string(bio))) 809 else: 810 value = _native( 811 _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:]) 812 parts.append(label + ":" + value) 813 return ", ".join(parts) 814 815 def __str__(self): 816 """ 817 :return: a nice text representation of the extension 818 """ 819 if _lib.NID_subject_alt_name == self._nid: 820 return self._subjectAltNameString() 821 822 bio = _new_mem_buf() 823 print_result = _lib.X509V3_EXT_print(bio, self._extension, 0, 0) 824 _openssl_assert(print_result != 0) 825 826 return _native(_bio_to_string(bio)) 827 828 def get_critical(self): 829 """ 830 Returns the critical field of this X.509 extension. 831 832 :return: The critical field. 833 """ 834 return _lib.X509_EXTENSION_get_critical(self._extension) 835 836 def get_short_name(self): 837 """ 838 Returns the short type name of this X.509 extension. 839 840 The result is a byte string such as :py:const:`b"basicConstraints"`. 841 842 :return: The short type name. 843 :rtype: :py:data:`bytes` 844 845 .. versionadded:: 0.12 846 """ 847 obj = _lib.X509_EXTENSION_get_object(self._extension) 848 nid = _lib.OBJ_obj2nid(obj) 849 return _ffi.string(_lib.OBJ_nid2sn(nid)) 850 851 def get_data(self): 852 """ 853 Returns the data of the X509 extension, encoded as ASN.1. 854 855 :return: The ASN.1 encoded data of this X509 extension. 856 :rtype: :py:data:`bytes` 857 858 .. versionadded:: 0.12 859 """ 860 octet_result = _lib.X509_EXTENSION_get_data(self._extension) 861 string_result = _ffi.cast('ASN1_STRING*', octet_result) 862 char_result = _lib.ASN1_STRING_data(string_result) 863 result_length = _lib.ASN1_STRING_length(string_result) 864 return _ffi.buffer(char_result, result_length)[:] 865 866 867X509ExtensionType = deprecated( 868 X509Extension, __name__, 869 "X509ExtensionType has been deprecated, use X509Extension instead", 870 DeprecationWarning 871) 872 873 874class X509Req(object): 875 """ 876 An X.509 certificate signing requests. 877 """ 878 879 def __init__(self): 880 req = _lib.X509_REQ_new() 881 self._req = _ffi.gc(req, _lib.X509_REQ_free) 882 # Default to version 0. 883 self.set_version(0) 884 885 def to_cryptography(self): 886 """ 887 Export as a ``cryptography`` certificate signing request. 888 889 :rtype: ``cryptography.x509.CertificateSigningRequest`` 890 891 .. versionadded:: 17.1.0 892 """ 893 from cryptography.hazmat.backends.openssl.x509 import ( 894 _CertificateSigningRequest 895 ) 896 backend = _get_backend() 897 return _CertificateSigningRequest(backend, self._req) 898 899 @classmethod 900 def from_cryptography(cls, crypto_req): 901 """ 902 Construct based on a ``cryptography`` *crypto_req*. 903 904 :param crypto_req: A ``cryptography`` X.509 certificate signing request 905 :type crypto_req: ``cryptography.x509.CertificateSigningRequest`` 906 907 :rtype: X509Req 908 909 .. versionadded:: 17.1.0 910 """ 911 if not isinstance(crypto_req, x509.CertificateSigningRequest): 912 raise TypeError("Must be a certificate signing request") 913 914 req = cls() 915 req._req = crypto_req._x509_req 916 return req 917 918 def set_pubkey(self, pkey): 919 """ 920 Set the public key of the certificate signing request. 921 922 :param pkey: The public key to use. 923 :type pkey: :py:class:`PKey` 924 925 :return: ``None`` 926 """ 927 set_result = _lib.X509_REQ_set_pubkey(self._req, pkey._pkey) 928 _openssl_assert(set_result == 1) 929 930 def get_pubkey(self): 931 """ 932 Get the public key of the certificate signing request. 933 934 :return: The public key. 935 :rtype: :py:class:`PKey` 936 """ 937 pkey = PKey.__new__(PKey) 938 pkey._pkey = _lib.X509_REQ_get_pubkey(self._req) 939 _openssl_assert(pkey._pkey != _ffi.NULL) 940 pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free) 941 pkey._only_public = True 942 return pkey 943 944 def set_version(self, version): 945 """ 946 Set the version subfield (RFC 2459, section 4.1.2.1) of the certificate 947 request. 948 949 :param int version: The version number. 950 :return: ``None`` 951 """ 952 set_result = _lib.X509_REQ_set_version(self._req, version) 953 _openssl_assert(set_result == 1) 954 955 def get_version(self): 956 """ 957 Get the version subfield (RFC 2459, section 4.1.2.1) of the certificate 958 request. 959 960 :return: The value of the version subfield. 961 :rtype: :py:class:`int` 962 """ 963 return _lib.X509_REQ_get_version(self._req) 964 965 def get_subject(self): 966 """ 967 Return the subject of this certificate signing request. 968 969 This creates a new :class:`X509Name` that wraps the underlying subject 970 name field on the certificate signing request. Modifying it will modify 971 the underlying signing request, and will have the effect of modifying 972 any other :class:`X509Name` that refers to this subject. 973 974 :return: The subject of this certificate signing request. 975 :rtype: :class:`X509Name` 976 """ 977 name = X509Name.__new__(X509Name) 978 name._name = _lib.X509_REQ_get_subject_name(self._req) 979 _openssl_assert(name._name != _ffi.NULL) 980 981 # The name is owned by the X509Req structure. As long as the X509Name 982 # Python object is alive, keep the X509Req Python object alive. 983 name._owner = self 984 985 return name 986 987 def add_extensions(self, extensions): 988 """ 989 Add extensions to the certificate signing request. 990 991 :param extensions: The X.509 extensions to add. 992 :type extensions: iterable of :py:class:`X509Extension` 993 :return: ``None`` 994 """ 995 stack = _lib.sk_X509_EXTENSION_new_null() 996 _openssl_assert(stack != _ffi.NULL) 997 998 stack = _ffi.gc(stack, _lib.sk_X509_EXTENSION_free) 999 1000 for ext in extensions: 1001 if not isinstance(ext, X509Extension): 1002 raise ValueError("One of the elements is not an X509Extension") 1003 1004 # TODO push can fail (here and elsewhere) 1005 _lib.sk_X509_EXTENSION_push(stack, ext._extension) 1006 1007 add_result = _lib.X509_REQ_add_extensions(self._req, stack) 1008 _openssl_assert(add_result == 1) 1009 1010 def get_extensions(self): 1011 """ 1012 Get X.509 extensions in the certificate signing request. 1013 1014 :return: The X.509 extensions in this request. 1015 :rtype: :py:class:`list` of :py:class:`X509Extension` objects. 1016 1017 .. versionadded:: 0.15 1018 """ 1019 exts = [] 1020 native_exts_obj = _lib.X509_REQ_get_extensions(self._req) 1021 for i in range(_lib.sk_X509_EXTENSION_num(native_exts_obj)): 1022 ext = X509Extension.__new__(X509Extension) 1023 ext._extension = _lib.sk_X509_EXTENSION_value(native_exts_obj, i) 1024 exts.append(ext) 1025 return exts 1026 1027 def sign(self, pkey, digest): 1028 """ 1029 Sign the certificate signing request with this key and digest type. 1030 1031 :param pkey: The key pair to sign with. 1032 :type pkey: :py:class:`PKey` 1033 :param digest: The name of the message digest to use for the signature, 1034 e.g. :py:data:`b"sha256"`. 1035 :type digest: :py:class:`bytes` 1036 :return: ``None`` 1037 """ 1038 if pkey._only_public: 1039 raise ValueError("Key has only public part") 1040 1041 if not pkey._initialized: 1042 raise ValueError("Key is uninitialized") 1043 1044 digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) 1045 if digest_obj == _ffi.NULL: 1046 raise ValueError("No such digest method") 1047 1048 sign_result = _lib.X509_REQ_sign(self._req, pkey._pkey, digest_obj) 1049 _openssl_assert(sign_result > 0) 1050 1051 def verify(self, pkey): 1052 """ 1053 Verifies the signature on this certificate signing request. 1054 1055 :param PKey key: A public key. 1056 1057 :return: ``True`` if the signature is correct. 1058 :rtype: bool 1059 1060 :raises OpenSSL.crypto.Error: If the signature is invalid or there is a 1061 problem verifying the signature. 1062 """ 1063 if not isinstance(pkey, PKey): 1064 raise TypeError("pkey must be a PKey instance") 1065 1066 result = _lib.X509_REQ_verify(self._req, pkey._pkey) 1067 if result <= 0: 1068 _raise_current_error() 1069 1070 return result 1071 1072 1073X509ReqType = deprecated( 1074 X509Req, __name__, 1075 "X509ReqType has been deprecated, use X509Req instead", 1076 DeprecationWarning 1077) 1078 1079 1080class X509(object): 1081 """ 1082 An X.509 certificate. 1083 """ 1084 def __init__(self): 1085 x509 = _lib.X509_new() 1086 _openssl_assert(x509 != _ffi.NULL) 1087 self._x509 = _ffi.gc(x509, _lib.X509_free) 1088 1089 self._issuer_invalidator = _X509NameInvalidator() 1090 self._subject_invalidator = _X509NameInvalidator() 1091 1092 @classmethod 1093 def _from_raw_x509_ptr(cls, x509): 1094 cert = cls.__new__(cls) 1095 cert._x509 = _ffi.gc(x509, _lib.X509_free) 1096 cert._issuer_invalidator = _X509NameInvalidator() 1097 cert._subject_invalidator = _X509NameInvalidator() 1098 return cert 1099 1100 def to_cryptography(self): 1101 """ 1102 Export as a ``cryptography`` certificate. 1103 1104 :rtype: ``cryptography.x509.Certificate`` 1105 1106 .. versionadded:: 17.1.0 1107 """ 1108 from cryptography.hazmat.backends.openssl.x509 import _Certificate 1109 backend = _get_backend() 1110 return _Certificate(backend, self._x509) 1111 1112 @classmethod 1113 def from_cryptography(cls, crypto_cert): 1114 """ 1115 Construct based on a ``cryptography`` *crypto_cert*. 1116 1117 :param crypto_key: A ``cryptography`` X.509 certificate. 1118 :type crypto_key: ``cryptography.x509.Certificate`` 1119 1120 :rtype: X509 1121 1122 .. versionadded:: 17.1.0 1123 """ 1124 if not isinstance(crypto_cert, x509.Certificate): 1125 raise TypeError("Must be a certificate") 1126 1127 cert = cls() 1128 cert._x509 = crypto_cert._x509 1129 return cert 1130 1131 def set_version(self, version): 1132 """ 1133 Set the version number of the certificate. Note that the 1134 version value is zero-based, eg. a value of 0 is V1. 1135 1136 :param version: The version number of the certificate. 1137 :type version: :py:class:`int` 1138 1139 :return: ``None`` 1140 """ 1141 if not isinstance(version, int): 1142 raise TypeError("version must be an integer") 1143 1144 _lib.X509_set_version(self._x509, version) 1145 1146 def get_version(self): 1147 """ 1148 Return the version number of the certificate. 1149 1150 :return: The version number of the certificate. 1151 :rtype: :py:class:`int` 1152 """ 1153 return _lib.X509_get_version(self._x509) 1154 1155 def get_pubkey(self): 1156 """ 1157 Get the public key of the certificate. 1158 1159 :return: The public key. 1160 :rtype: :py:class:`PKey` 1161 """ 1162 pkey = PKey.__new__(PKey) 1163 pkey._pkey = _lib.X509_get_pubkey(self._x509) 1164 if pkey._pkey == _ffi.NULL: 1165 _raise_current_error() 1166 pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free) 1167 pkey._only_public = True 1168 return pkey 1169 1170 def set_pubkey(self, pkey): 1171 """ 1172 Set the public key of the certificate. 1173 1174 :param pkey: The public key. 1175 :type pkey: :py:class:`PKey` 1176 1177 :return: :py:data:`None` 1178 """ 1179 if not isinstance(pkey, PKey): 1180 raise TypeError("pkey must be a PKey instance") 1181 1182 set_result = _lib.X509_set_pubkey(self._x509, pkey._pkey) 1183 _openssl_assert(set_result == 1) 1184 1185 def sign(self, pkey, digest): 1186 """ 1187 Sign the certificate with this key and digest type. 1188 1189 :param pkey: The key to sign with. 1190 :type pkey: :py:class:`PKey` 1191 1192 :param digest: The name of the message digest to use. 1193 :type digest: :py:class:`bytes` 1194 1195 :return: :py:data:`None` 1196 """ 1197 if not isinstance(pkey, PKey): 1198 raise TypeError("pkey must be a PKey instance") 1199 1200 if pkey._only_public: 1201 raise ValueError("Key only has public part") 1202 1203 if not pkey._initialized: 1204 raise ValueError("Key is uninitialized") 1205 1206 evp_md = _lib.EVP_get_digestbyname(_byte_string(digest)) 1207 if evp_md == _ffi.NULL: 1208 raise ValueError("No such digest method") 1209 1210 sign_result = _lib.X509_sign(self._x509, pkey._pkey, evp_md) 1211 _openssl_assert(sign_result > 0) 1212 1213 def get_signature_algorithm(self): 1214 """ 1215 Return the signature algorithm used in the certificate. 1216 1217 :return: The name of the algorithm. 1218 :rtype: :py:class:`bytes` 1219 1220 :raises ValueError: If the signature algorithm is undefined. 1221 1222 .. versionadded:: 0.13 1223 """ 1224 algor = _lib.X509_get0_tbs_sigalg(self._x509) 1225 nid = _lib.OBJ_obj2nid(algor.algorithm) 1226 if nid == _lib.NID_undef: 1227 raise ValueError("Undefined signature algorithm") 1228 return _ffi.string(_lib.OBJ_nid2ln(nid)) 1229 1230 def digest(self, digest_name): 1231 """ 1232 Return the digest of the X509 object. 1233 1234 :param digest_name: The name of the digest algorithm to use. 1235 :type digest_name: :py:class:`bytes` 1236 1237 :return: The digest of the object, formatted as 1238 :py:const:`b":"`-delimited hex pairs. 1239 :rtype: :py:class:`bytes` 1240 """ 1241 digest = _lib.EVP_get_digestbyname(_byte_string(digest_name)) 1242 if digest == _ffi.NULL: 1243 raise ValueError("No such digest method") 1244 1245 result_buffer = _ffi.new("unsigned char[]", _lib.EVP_MAX_MD_SIZE) 1246 result_length = _ffi.new("unsigned int[]", 1) 1247 result_length[0] = len(result_buffer) 1248 1249 digest_result = _lib.X509_digest( 1250 self._x509, digest, result_buffer, result_length) 1251 _openssl_assert(digest_result == 1) 1252 1253 return b":".join([ 1254 b16encode(ch).upper() for ch 1255 in _ffi.buffer(result_buffer, result_length[0])]) 1256 1257 def subject_name_hash(self): 1258 """ 1259 Return the hash of the X509 subject. 1260 1261 :return: The hash of the subject. 1262 :rtype: :py:class:`bytes` 1263 """ 1264 return _lib.X509_subject_name_hash(self._x509) 1265 1266 def set_serial_number(self, serial): 1267 """ 1268 Set the serial number of the certificate. 1269 1270 :param serial: The new serial number. 1271 :type serial: :py:class:`int` 1272 1273 :return: :py:data`None` 1274 """ 1275 if not isinstance(serial, _integer_types): 1276 raise TypeError("serial must be an integer") 1277 1278 hex_serial = hex(serial)[2:] 1279 if not isinstance(hex_serial, bytes): 1280 hex_serial = hex_serial.encode('ascii') 1281 1282 bignum_serial = _ffi.new("BIGNUM**") 1283 1284 # BN_hex2bn stores the result in &bignum. Unless it doesn't feel like 1285 # it. If bignum is still NULL after this call, then the return value 1286 # is actually the result. I hope. -exarkun 1287 small_serial = _lib.BN_hex2bn(bignum_serial, hex_serial) 1288 1289 if bignum_serial[0] == _ffi.NULL: 1290 set_result = _lib.ASN1_INTEGER_set( 1291 _lib.X509_get_serialNumber(self._x509), small_serial) 1292 if set_result: 1293 # TODO Not tested 1294 _raise_current_error() 1295 else: 1296 asn1_serial = _lib.BN_to_ASN1_INTEGER(bignum_serial[0], _ffi.NULL) 1297 _lib.BN_free(bignum_serial[0]) 1298 if asn1_serial == _ffi.NULL: 1299 # TODO Not tested 1300 _raise_current_error() 1301 asn1_serial = _ffi.gc(asn1_serial, _lib.ASN1_INTEGER_free) 1302 set_result = _lib.X509_set_serialNumber(self._x509, asn1_serial) 1303 _openssl_assert(set_result == 1) 1304 1305 def get_serial_number(self): 1306 """ 1307 Return the serial number of this certificate. 1308 1309 :return: The serial number. 1310 :rtype: int 1311 """ 1312 asn1_serial = _lib.X509_get_serialNumber(self._x509) 1313 bignum_serial = _lib.ASN1_INTEGER_to_BN(asn1_serial, _ffi.NULL) 1314 try: 1315 hex_serial = _lib.BN_bn2hex(bignum_serial) 1316 try: 1317 hexstring_serial = _ffi.string(hex_serial) 1318 serial = int(hexstring_serial, 16) 1319 return serial 1320 finally: 1321 _lib.OPENSSL_free(hex_serial) 1322 finally: 1323 _lib.BN_free(bignum_serial) 1324 1325 def gmtime_adj_notAfter(self, amount): 1326 """ 1327 Adjust the time stamp on which the certificate stops being valid. 1328 1329 :param int amount: The number of seconds by which to adjust the 1330 timestamp. 1331 :return: ``None`` 1332 """ 1333 if not isinstance(amount, int): 1334 raise TypeError("amount must be an integer") 1335 1336 notAfter = _lib.X509_get_notAfter(self._x509) 1337 _lib.X509_gmtime_adj(notAfter, amount) 1338 1339 def gmtime_adj_notBefore(self, amount): 1340 """ 1341 Adjust the timestamp on which the certificate starts being valid. 1342 1343 :param amount: The number of seconds by which to adjust the timestamp. 1344 :return: ``None`` 1345 """ 1346 if not isinstance(amount, int): 1347 raise TypeError("amount must be an integer") 1348 1349 notBefore = _lib.X509_get_notBefore(self._x509) 1350 _lib.X509_gmtime_adj(notBefore, amount) 1351 1352 def has_expired(self): 1353 """ 1354 Check whether the certificate has expired. 1355 1356 :return: ``True`` if the certificate has expired, ``False`` otherwise. 1357 :rtype: bool 1358 """ 1359 time_string = _native(self.get_notAfter()) 1360 not_after = datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ") 1361 1362 return not_after < datetime.datetime.utcnow() 1363 1364 def _get_boundary_time(self, which): 1365 return _get_asn1_time(which(self._x509)) 1366 1367 def get_notBefore(self): 1368 """ 1369 Get the timestamp at which the certificate starts being valid. 1370 1371 The timestamp is formatted as an ASN.1 TIME:: 1372 1373 YYYYMMDDhhmmssZ 1374 1375 :return: A timestamp string, or ``None`` if there is none. 1376 :rtype: bytes or NoneType 1377 """ 1378 return self._get_boundary_time(_lib.X509_get_notBefore) 1379 1380 def _set_boundary_time(self, which, when): 1381 return _set_asn1_time(which(self._x509), when) 1382 1383 def set_notBefore(self, when): 1384 """ 1385 Set the timestamp at which the certificate starts being valid. 1386 1387 The timestamp is formatted as an ASN.1 TIME:: 1388 1389 YYYYMMDDhhmmssZ 1390 1391 :param bytes when: A timestamp string. 1392 :return: ``None`` 1393 """ 1394 return self._set_boundary_time(_lib.X509_get_notBefore, when) 1395 1396 def get_notAfter(self): 1397 """ 1398 Get the timestamp at which the certificate stops being valid. 1399 1400 The timestamp is formatted as an ASN.1 TIME:: 1401 1402 YYYYMMDDhhmmssZ 1403 1404 :return: A timestamp string, or ``None`` if there is none. 1405 :rtype: bytes or NoneType 1406 """ 1407 return self._get_boundary_time(_lib.X509_get_notAfter) 1408 1409 def set_notAfter(self, when): 1410 """ 1411 Set the timestamp at which the certificate stops being valid. 1412 1413 The timestamp is formatted as an ASN.1 TIME:: 1414 1415 YYYYMMDDhhmmssZ 1416 1417 :param bytes when: A timestamp string. 1418 :return: ``None`` 1419 """ 1420 return self._set_boundary_time(_lib.X509_get_notAfter, when) 1421 1422 def _get_name(self, which): 1423 name = X509Name.__new__(X509Name) 1424 name._name = which(self._x509) 1425 _openssl_assert(name._name != _ffi.NULL) 1426 1427 # The name is owned by the X509 structure. As long as the X509Name 1428 # Python object is alive, keep the X509 Python object alive. 1429 name._owner = self 1430 1431 return name 1432 1433 def _set_name(self, which, name): 1434 if not isinstance(name, X509Name): 1435 raise TypeError("name must be an X509Name") 1436 set_result = which(self._x509, name._name) 1437 _openssl_assert(set_result == 1) 1438 1439 def get_issuer(self): 1440 """ 1441 Return the issuer of this certificate. 1442 1443 This creates a new :class:`X509Name` that wraps the underlying issuer 1444 name field on the certificate. Modifying it will modify the underlying 1445 certificate, and will have the effect of modifying any other 1446 :class:`X509Name` that refers to this issuer. 1447 1448 :return: The issuer of this certificate. 1449 :rtype: :class:`X509Name` 1450 """ 1451 name = self._get_name(_lib.X509_get_issuer_name) 1452 self._issuer_invalidator.add(name) 1453 return name 1454 1455 def set_issuer(self, issuer): 1456 """ 1457 Set the issuer of this certificate. 1458 1459 :param issuer: The issuer. 1460 :type issuer: :py:class:`X509Name` 1461 1462 :return: ``None`` 1463 """ 1464 self._set_name(_lib.X509_set_issuer_name, issuer) 1465 self._issuer_invalidator.clear() 1466 1467 def get_subject(self): 1468 """ 1469 Return the subject of this certificate. 1470 1471 This creates a new :class:`X509Name` that wraps the underlying subject 1472 name field on the certificate. Modifying it will modify the underlying 1473 certificate, and will have the effect of modifying any other 1474 :class:`X509Name` that refers to this subject. 1475 1476 :return: The subject of this certificate. 1477 :rtype: :class:`X509Name` 1478 """ 1479 name = self._get_name(_lib.X509_get_subject_name) 1480 self._subject_invalidator.add(name) 1481 return name 1482 1483 def set_subject(self, subject): 1484 """ 1485 Set the subject of this certificate. 1486 1487 :param subject: The subject. 1488 :type subject: :py:class:`X509Name` 1489 1490 :return: ``None`` 1491 """ 1492 self._set_name(_lib.X509_set_subject_name, subject) 1493 self._subject_invalidator.clear() 1494 1495 def get_extension_count(self): 1496 """ 1497 Get the number of extensions on this certificate. 1498 1499 :return: The number of extensions. 1500 :rtype: :py:class:`int` 1501 1502 .. versionadded:: 0.12 1503 """ 1504 return _lib.X509_get_ext_count(self._x509) 1505 1506 def add_extensions(self, extensions): 1507 """ 1508 Add extensions to the certificate. 1509 1510 :param extensions: The extensions to add. 1511 :type extensions: An iterable of :py:class:`X509Extension` objects. 1512 :return: ``None`` 1513 """ 1514 for ext in extensions: 1515 if not isinstance(ext, X509Extension): 1516 raise ValueError("One of the elements is not an X509Extension") 1517 1518 add_result = _lib.X509_add_ext(self._x509, ext._extension, -1) 1519 if not add_result: 1520 _raise_current_error() 1521 1522 def get_extension(self, index): 1523 """ 1524 Get a specific extension of the certificate by index. 1525 1526 Extensions on a certificate are kept in order. The index 1527 parameter selects which extension will be returned. 1528 1529 :param int index: The index of the extension to retrieve. 1530 :return: The extension at the specified index. 1531 :rtype: :py:class:`X509Extension` 1532 :raises IndexError: If the extension index was out of bounds. 1533 1534 .. versionadded:: 0.12 1535 """ 1536 ext = X509Extension.__new__(X509Extension) 1537 ext._extension = _lib.X509_get_ext(self._x509, index) 1538 if ext._extension == _ffi.NULL: 1539 raise IndexError("extension index out of bounds") 1540 1541 extension = _lib.X509_EXTENSION_dup(ext._extension) 1542 ext._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free) 1543 return ext 1544 1545 1546X509Type = deprecated( 1547 X509, __name__, 1548 "X509Type has been deprecated, use X509 instead", 1549 DeprecationWarning 1550) 1551 1552 1553class X509StoreFlags(object): 1554 """ 1555 Flags for X509 verification, used to change the behavior of 1556 :class:`X509Store`. 1557 1558 See `OpenSSL Verification Flags`_ for details. 1559 1560 .. _OpenSSL Verification Flags: 1561 https://www.openssl.org/docs/manmaster/man3/X509_VERIFY_PARAM_set_flags.html 1562 """ 1563 CRL_CHECK = _lib.X509_V_FLAG_CRL_CHECK 1564 CRL_CHECK_ALL = _lib.X509_V_FLAG_CRL_CHECK_ALL 1565 IGNORE_CRITICAL = _lib.X509_V_FLAG_IGNORE_CRITICAL 1566 X509_STRICT = _lib.X509_V_FLAG_X509_STRICT 1567 ALLOW_PROXY_CERTS = _lib.X509_V_FLAG_ALLOW_PROXY_CERTS 1568 POLICY_CHECK = _lib.X509_V_FLAG_POLICY_CHECK 1569 EXPLICIT_POLICY = _lib.X509_V_FLAG_EXPLICIT_POLICY 1570 INHIBIT_MAP = _lib.X509_V_FLAG_INHIBIT_MAP 1571 NOTIFY_POLICY = _lib.X509_V_FLAG_NOTIFY_POLICY 1572 CHECK_SS_SIGNATURE = _lib.X509_V_FLAG_CHECK_SS_SIGNATURE 1573 CB_ISSUER_CHECK = _lib.X509_V_FLAG_CB_ISSUER_CHECK 1574 1575 1576class X509Store(object): 1577 """ 1578 An X.509 store. 1579 1580 An X.509 store is used to describe a context in which to verify a 1581 certificate. A description of a context may include a set of certificates 1582 to trust, a set of certificate revocation lists, verification flags and 1583 more. 1584 1585 An X.509 store, being only a description, cannot be used by itself to 1586 verify a certificate. To carry out the actual verification process, see 1587 :class:`X509StoreContext`. 1588 """ 1589 1590 def __init__(self): 1591 store = _lib.X509_STORE_new() 1592 self._store = _ffi.gc(store, _lib.X509_STORE_free) 1593 1594 def add_cert(self, cert): 1595 """ 1596 Adds a trusted certificate to this store. 1597 1598 Adding a certificate with this method adds this certificate as a 1599 *trusted* certificate. 1600 1601 :param X509 cert: The certificate to add to this store. 1602 1603 :raises TypeError: If the certificate is not an :class:`X509`. 1604 1605 :raises OpenSSL.crypto.Error: If OpenSSL was unhappy with your 1606 certificate. 1607 1608 :return: ``None`` if the certificate was added successfully. 1609 """ 1610 if not isinstance(cert, X509): 1611 raise TypeError() 1612 1613 # As of OpenSSL 1.1.0i adding the same cert to the store more than 1614 # once doesn't cause an error. Accordingly, this code now silences 1615 # the error for OpenSSL < 1.1.0i as well. 1616 if _lib.X509_STORE_add_cert(self._store, cert._x509) == 0: 1617 code = _lib.ERR_peek_error() 1618 err_reason = _lib.ERR_GET_REASON(code) 1619 _openssl_assert( 1620 err_reason == _lib.X509_R_CERT_ALREADY_IN_HASH_TABLE 1621 ) 1622 _lib.ERR_clear_error() 1623 1624 def add_crl(self, crl): 1625 """ 1626 Add a certificate revocation list to this store. 1627 1628 The certificate revocation lists added to a store will only be used if 1629 the associated flags are configured to check certificate revocation 1630 lists. 1631 1632 .. versionadded:: 16.1.0 1633 1634 :param CRL crl: The certificate revocation list to add to this store. 1635 :return: ``None`` if the certificate revocation list was added 1636 successfully. 1637 """ 1638 _openssl_assert(_lib.X509_STORE_add_crl(self._store, crl._crl) != 0) 1639 1640 def set_flags(self, flags): 1641 """ 1642 Set verification flags to this store. 1643 1644 Verification flags can be combined by oring them together. 1645 1646 .. note:: 1647 1648 Setting a verification flag sometimes requires clients to add 1649 additional information to the store, otherwise a suitable error will 1650 be raised. 1651 1652 For example, in setting flags to enable CRL checking a 1653 suitable CRL must be added to the store otherwise an error will be 1654 raised. 1655 1656 .. versionadded:: 16.1.0 1657 1658 :param int flags: The verification flags to set on this store. 1659 See :class:`X509StoreFlags` for available constants. 1660 :return: ``None`` if the verification flags were successfully set. 1661 """ 1662 _openssl_assert(_lib.X509_STORE_set_flags(self._store, flags) != 0) 1663 1664 def set_time(self, vfy_time): 1665 """ 1666 Set the time against which the certificates are verified. 1667 1668 Normally the current time is used. 1669 1670 .. note:: 1671 1672 For example, you can determine if a certificate was valid at a given 1673 time. 1674 1675 .. versionadded:: 17.0.0 1676 1677 :param datetime vfy_time: The verification time to set on this store. 1678 :return: ``None`` if the verification time was successfully set. 1679 """ 1680 param = _lib.X509_VERIFY_PARAM_new() 1681 param = _ffi.gc(param, _lib.X509_VERIFY_PARAM_free) 1682 1683 _lib.X509_VERIFY_PARAM_set_time(param, int(vfy_time.strftime('%s'))) 1684 _openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0) 1685 1686 1687X509StoreType = deprecated( 1688 X509Store, __name__, 1689 "X509StoreType has been deprecated, use X509Store instead", 1690 DeprecationWarning 1691) 1692 1693 1694class X509StoreContextError(Exception): 1695 """ 1696 An exception raised when an error occurred while verifying a certificate 1697 using `OpenSSL.X509StoreContext.verify_certificate`. 1698 1699 :ivar certificate: The certificate which caused verificate failure. 1700 :type certificate: :class:`X509` 1701 """ 1702 1703 def __init__(self, message, certificate): 1704 super(X509StoreContextError, self).__init__(message) 1705 self.certificate = certificate 1706 1707 1708class X509StoreContext(object): 1709 """ 1710 An X.509 store context. 1711 1712 An X.509 store context is used to carry out the actual verification process 1713 of a certificate in a described context. For describing such a context, see 1714 :class:`X509Store`. 1715 1716 :ivar _store_ctx: The underlying X509_STORE_CTX structure used by this 1717 instance. It is dynamically allocated and automatically garbage 1718 collected. 1719 :ivar _store: See the ``store`` ``__init__`` parameter. 1720 :ivar _cert: See the ``certificate`` ``__init__`` parameter. 1721 :param X509Store store: The certificates which will be trusted for the 1722 purposes of any verifications. 1723 :param X509 certificate: The certificate to be verified. 1724 """ 1725 1726 def __init__(self, store, certificate): 1727 store_ctx = _lib.X509_STORE_CTX_new() 1728 self._store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free) 1729 self._store = store 1730 self._cert = certificate 1731 # Make the store context available for use after instantiating this 1732 # class by initializing it now. Per testing, subsequent calls to 1733 # :meth:`_init` have no adverse affect. 1734 self._init() 1735 1736 def _init(self): 1737 """ 1738 Set up the store context for a subsequent verification operation. 1739 1740 Calling this method more than once without first calling 1741 :meth:`_cleanup` will leak memory. 1742 """ 1743 ret = _lib.X509_STORE_CTX_init( 1744 self._store_ctx, self._store._store, self._cert._x509, _ffi.NULL 1745 ) 1746 if ret <= 0: 1747 _raise_current_error() 1748 1749 def _cleanup(self): 1750 """ 1751 Internally cleans up the store context. 1752 1753 The store context can then be reused with a new call to :meth:`_init`. 1754 """ 1755 _lib.X509_STORE_CTX_cleanup(self._store_ctx) 1756 1757 def _exception_from_context(self): 1758 """ 1759 Convert an OpenSSL native context error failure into a Python 1760 exception. 1761 1762 When a call to native OpenSSL X509_verify_cert fails, additional 1763 information about the failure can be obtained from the store context. 1764 """ 1765 errors = [ 1766 _lib.X509_STORE_CTX_get_error(self._store_ctx), 1767 _lib.X509_STORE_CTX_get_error_depth(self._store_ctx), 1768 _native(_ffi.string(_lib.X509_verify_cert_error_string( 1769 _lib.X509_STORE_CTX_get_error(self._store_ctx)))), 1770 ] 1771 # A context error should always be associated with a certificate, so we 1772 # expect this call to never return :class:`None`. 1773 _x509 = _lib.X509_STORE_CTX_get_current_cert(self._store_ctx) 1774 _cert = _lib.X509_dup(_x509) 1775 pycert = X509._from_raw_x509_ptr(_cert) 1776 return X509StoreContextError(errors, pycert) 1777 1778 def set_store(self, store): 1779 """ 1780 Set the context's X.509 store. 1781 1782 .. versionadded:: 0.15 1783 1784 :param X509Store store: The store description which will be used for 1785 the purposes of any *future* verifications. 1786 """ 1787 self._store = store 1788 1789 def verify_certificate(self): 1790 """ 1791 Verify a certificate in a context. 1792 1793 .. versionadded:: 0.15 1794 1795 :raises X509StoreContextError: If an error occurred when validating a 1796 certificate in the context. Sets ``certificate`` attribute to 1797 indicate which certificate caused the error. 1798 """ 1799 # Always re-initialize the store context in case 1800 # :meth:`verify_certificate` is called multiple times. 1801 # 1802 # :meth:`_init` is called in :meth:`__init__` so _cleanup is called 1803 # before _init to ensure memory is not leaked. 1804 self._cleanup() 1805 self._init() 1806 ret = _lib.X509_verify_cert(self._store_ctx) 1807 self._cleanup() 1808 if ret <= 0: 1809 raise self._exception_from_context() 1810 1811 1812def load_certificate(type, buffer): 1813 """ 1814 Load a certificate (X509) from the string *buffer* encoded with the 1815 type *type*. 1816 1817 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 1818 1819 :param bytes buffer: The buffer the certificate is stored in 1820 1821 :return: The X509 object 1822 """ 1823 if isinstance(buffer, _text_type): 1824 buffer = buffer.encode("ascii") 1825 1826 bio = _new_mem_buf(buffer) 1827 1828 if type == FILETYPE_PEM: 1829 x509 = _lib.PEM_read_bio_X509(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) 1830 elif type == FILETYPE_ASN1: 1831 x509 = _lib.d2i_X509_bio(bio, _ffi.NULL) 1832 else: 1833 raise ValueError( 1834 "type argument must be FILETYPE_PEM or FILETYPE_ASN1") 1835 1836 if x509 == _ffi.NULL: 1837 _raise_current_error() 1838 1839 return X509._from_raw_x509_ptr(x509) 1840 1841 1842def dump_certificate(type, cert): 1843 """ 1844 Dump the certificate *cert* into a buffer string encoded with the type 1845 *type*. 1846 1847 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1, or 1848 FILETYPE_TEXT) 1849 :param cert: The certificate to dump 1850 :return: The buffer with the dumped certificate in 1851 """ 1852 bio = _new_mem_buf() 1853 1854 if type == FILETYPE_PEM: 1855 result_code = _lib.PEM_write_bio_X509(bio, cert._x509) 1856 elif type == FILETYPE_ASN1: 1857 result_code = _lib.i2d_X509_bio(bio, cert._x509) 1858 elif type == FILETYPE_TEXT: 1859 result_code = _lib.X509_print_ex(bio, cert._x509, 0, 0) 1860 else: 1861 raise ValueError( 1862 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " 1863 "FILETYPE_TEXT") 1864 1865 assert result_code == 1 1866 return _bio_to_string(bio) 1867 1868 1869def dump_publickey(type, pkey): 1870 """ 1871 Dump a public key to a buffer. 1872 1873 :param type: The file type (one of :data:`FILETYPE_PEM` or 1874 :data:`FILETYPE_ASN1`). 1875 :param PKey pkey: The public key to dump 1876 :return: The buffer with the dumped key in it. 1877 :rtype: bytes 1878 """ 1879 bio = _new_mem_buf() 1880 if type == FILETYPE_PEM: 1881 write_bio = _lib.PEM_write_bio_PUBKEY 1882 elif type == FILETYPE_ASN1: 1883 write_bio = _lib.i2d_PUBKEY_bio 1884 else: 1885 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 1886 1887 result_code = write_bio(bio, pkey._pkey) 1888 if result_code != 1: # pragma: no cover 1889 _raise_current_error() 1890 1891 return _bio_to_string(bio) 1892 1893 1894def dump_privatekey(type, pkey, cipher=None, passphrase=None): 1895 """ 1896 Dump the private key *pkey* into a buffer string encoded with the type 1897 *type*. Optionally (if *type* is :const:`FILETYPE_PEM`) encrypting it 1898 using *cipher* and *passphrase*. 1899 1900 :param type: The file type (one of :const:`FILETYPE_PEM`, 1901 :const:`FILETYPE_ASN1`, or :const:`FILETYPE_TEXT`) 1902 :param PKey pkey: The PKey to dump 1903 :param cipher: (optional) if encrypted PEM format, the cipher to use 1904 :param passphrase: (optional) if encrypted PEM format, this can be either 1905 the passphrase to use, or a callback for providing the passphrase. 1906 1907 :return: The buffer with the dumped key in 1908 :rtype: bytes 1909 """ 1910 bio = _new_mem_buf() 1911 1912 if not isinstance(pkey, PKey): 1913 raise TypeError("pkey must be a PKey") 1914 1915 if cipher is not None: 1916 if passphrase is None: 1917 raise TypeError( 1918 "if a value is given for cipher " 1919 "one must also be given for passphrase") 1920 cipher_obj = _lib.EVP_get_cipherbyname(_byte_string(cipher)) 1921 if cipher_obj == _ffi.NULL: 1922 raise ValueError("Invalid cipher name") 1923 else: 1924 cipher_obj = _ffi.NULL 1925 1926 helper = _PassphraseHelper(type, passphrase) 1927 if type == FILETYPE_PEM: 1928 result_code = _lib.PEM_write_bio_PrivateKey( 1929 bio, pkey._pkey, cipher_obj, _ffi.NULL, 0, 1930 helper.callback, helper.callback_args) 1931 helper.raise_if_problem() 1932 elif type == FILETYPE_ASN1: 1933 result_code = _lib.i2d_PrivateKey_bio(bio, pkey._pkey) 1934 elif type == FILETYPE_TEXT: 1935 if _lib.EVP_PKEY_id(pkey._pkey) != _lib.EVP_PKEY_RSA: 1936 raise TypeError("Only RSA keys are supported for FILETYPE_TEXT") 1937 1938 rsa = _ffi.gc( 1939 _lib.EVP_PKEY_get1_RSA(pkey._pkey), 1940 _lib.RSA_free 1941 ) 1942 result_code = _lib.RSA_print(bio, rsa, 0) 1943 else: 1944 raise ValueError( 1945 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " 1946 "FILETYPE_TEXT") 1947 1948 _openssl_assert(result_code != 0) 1949 1950 return _bio_to_string(bio) 1951 1952 1953class Revoked(object): 1954 """ 1955 A certificate revocation. 1956 """ 1957 # https://www.openssl.org/docs/manmaster/man5/x509v3_config.html#CRL-distribution-points 1958 # which differs from crl_reasons of crypto/x509v3/v3_enum.c that matches 1959 # OCSP_crl_reason_str. We use the latter, just like the command line 1960 # program. 1961 _crl_reasons = [ 1962 b"unspecified", 1963 b"keyCompromise", 1964 b"CACompromise", 1965 b"affiliationChanged", 1966 b"superseded", 1967 b"cessationOfOperation", 1968 b"certificateHold", 1969 # b"removeFromCRL", 1970 ] 1971 1972 def __init__(self): 1973 revoked = _lib.X509_REVOKED_new() 1974 self._revoked = _ffi.gc(revoked, _lib.X509_REVOKED_free) 1975 1976 def set_serial(self, hex_str): 1977 """ 1978 Set the serial number. 1979 1980 The serial number is formatted as a hexadecimal number encoded in 1981 ASCII. 1982 1983 :param bytes hex_str: The new serial number. 1984 1985 :return: ``None`` 1986 """ 1987 bignum_serial = _ffi.gc(_lib.BN_new(), _lib.BN_free) 1988 bignum_ptr = _ffi.new("BIGNUM**") 1989 bignum_ptr[0] = bignum_serial 1990 bn_result = _lib.BN_hex2bn(bignum_ptr, hex_str) 1991 if not bn_result: 1992 raise ValueError("bad hex string") 1993 1994 asn1_serial = _ffi.gc( 1995 _lib.BN_to_ASN1_INTEGER(bignum_serial, _ffi.NULL), 1996 _lib.ASN1_INTEGER_free) 1997 _lib.X509_REVOKED_set_serialNumber(self._revoked, asn1_serial) 1998 1999 def get_serial(self): 2000 """ 2001 Get the serial number. 2002 2003 The serial number is formatted as a hexadecimal number encoded in 2004 ASCII. 2005 2006 :return: The serial number. 2007 :rtype: bytes 2008 """ 2009 bio = _new_mem_buf() 2010 2011 asn1_int = _lib.X509_REVOKED_get0_serialNumber(self._revoked) 2012 _openssl_assert(asn1_int != _ffi.NULL) 2013 result = _lib.i2a_ASN1_INTEGER(bio, asn1_int) 2014 _openssl_assert(result >= 0) 2015 return _bio_to_string(bio) 2016 2017 def _delete_reason(self): 2018 for i in range(_lib.X509_REVOKED_get_ext_count(self._revoked)): 2019 ext = _lib.X509_REVOKED_get_ext(self._revoked, i) 2020 obj = _lib.X509_EXTENSION_get_object(ext) 2021 if _lib.OBJ_obj2nid(obj) == _lib.NID_crl_reason: 2022 _lib.X509_EXTENSION_free(ext) 2023 _lib.X509_REVOKED_delete_ext(self._revoked, i) 2024 break 2025 2026 def set_reason(self, reason): 2027 """ 2028 Set the reason of this revocation. 2029 2030 If :data:`reason` is ``None``, delete the reason instead. 2031 2032 :param reason: The reason string. 2033 :type reason: :class:`bytes` or :class:`NoneType` 2034 2035 :return: ``None`` 2036 2037 .. seealso:: 2038 2039 :meth:`all_reasons`, which gives you a list of all supported 2040 reasons which you might pass to this method. 2041 """ 2042 if reason is None: 2043 self._delete_reason() 2044 elif not isinstance(reason, bytes): 2045 raise TypeError("reason must be None or a byte string") 2046 else: 2047 reason = reason.lower().replace(b' ', b'') 2048 reason_code = [r.lower() for r in self._crl_reasons].index(reason) 2049 2050 new_reason_ext = _lib.ASN1_ENUMERATED_new() 2051 _openssl_assert(new_reason_ext != _ffi.NULL) 2052 new_reason_ext = _ffi.gc(new_reason_ext, _lib.ASN1_ENUMERATED_free) 2053 2054 set_result = _lib.ASN1_ENUMERATED_set(new_reason_ext, reason_code) 2055 _openssl_assert(set_result != _ffi.NULL) 2056 2057 self._delete_reason() 2058 add_result = _lib.X509_REVOKED_add1_ext_i2d( 2059 self._revoked, _lib.NID_crl_reason, new_reason_ext, 0, 0) 2060 _openssl_assert(add_result == 1) 2061 2062 def get_reason(self): 2063 """ 2064 Get the reason of this revocation. 2065 2066 :return: The reason, or ``None`` if there is none. 2067 :rtype: bytes or NoneType 2068 2069 .. seealso:: 2070 2071 :meth:`all_reasons`, which gives you a list of all supported 2072 reasons this method might return. 2073 """ 2074 for i in range(_lib.X509_REVOKED_get_ext_count(self._revoked)): 2075 ext = _lib.X509_REVOKED_get_ext(self._revoked, i) 2076 obj = _lib.X509_EXTENSION_get_object(ext) 2077 if _lib.OBJ_obj2nid(obj) == _lib.NID_crl_reason: 2078 bio = _new_mem_buf() 2079 2080 print_result = _lib.X509V3_EXT_print(bio, ext, 0, 0) 2081 if not print_result: 2082 print_result = _lib.M_ASN1_OCTET_STRING_print( 2083 bio, _lib.X509_EXTENSION_get_data(ext) 2084 ) 2085 _openssl_assert(print_result != 0) 2086 2087 return _bio_to_string(bio) 2088 2089 def all_reasons(self): 2090 """ 2091 Return a list of all the supported reason strings. 2092 2093 This list is a copy; modifying it does not change the supported reason 2094 strings. 2095 2096 :return: A list of reason strings. 2097 :rtype: :class:`list` of :class:`bytes` 2098 """ 2099 return self._crl_reasons[:] 2100 2101 def set_rev_date(self, when): 2102 """ 2103 Set the revocation timestamp. 2104 2105 :param bytes when: The timestamp of the revocation, 2106 as ASN.1 TIME. 2107 :return: ``None`` 2108 """ 2109 dt = _lib.X509_REVOKED_get0_revocationDate(self._revoked) 2110 return _set_asn1_time(dt, when) 2111 2112 def get_rev_date(self): 2113 """ 2114 Get the revocation timestamp. 2115 2116 :return: The timestamp of the revocation, as ASN.1 TIME. 2117 :rtype: bytes 2118 """ 2119 dt = _lib.X509_REVOKED_get0_revocationDate(self._revoked) 2120 return _get_asn1_time(dt) 2121 2122 2123class CRL(object): 2124 """ 2125 A certificate revocation list. 2126 """ 2127 2128 def __init__(self): 2129 crl = _lib.X509_CRL_new() 2130 self._crl = _ffi.gc(crl, _lib.X509_CRL_free) 2131 2132 def to_cryptography(self): 2133 """ 2134 Export as a ``cryptography`` CRL. 2135 2136 :rtype: ``cryptography.x509.CertificateRevocationList`` 2137 2138 .. versionadded:: 17.1.0 2139 """ 2140 from cryptography.hazmat.backends.openssl.x509 import ( 2141 _CertificateRevocationList 2142 ) 2143 backend = _get_backend() 2144 return _CertificateRevocationList(backend, self._crl) 2145 2146 @classmethod 2147 def from_cryptography(cls, crypto_crl): 2148 """ 2149 Construct based on a ``cryptography`` *crypto_crl*. 2150 2151 :param crypto_crl: A ``cryptography`` certificate revocation list 2152 :type crypto_crl: ``cryptography.x509.CertificateRevocationList`` 2153 2154 :rtype: CRL 2155 2156 .. versionadded:: 17.1.0 2157 """ 2158 if not isinstance(crypto_crl, x509.CertificateRevocationList): 2159 raise TypeError("Must be a certificate revocation list") 2160 2161 crl = cls() 2162 crl._crl = crypto_crl._x509_crl 2163 return crl 2164 2165 def get_revoked(self): 2166 """ 2167 Return the revocations in this certificate revocation list. 2168 2169 These revocations will be provided by value, not by reference. 2170 That means it's okay to mutate them: it won't affect this CRL. 2171 2172 :return: The revocations in this CRL. 2173 :rtype: :class:`tuple` of :class:`Revocation` 2174 """ 2175 results = [] 2176 revoked_stack = _lib.X509_CRL_get_REVOKED(self._crl) 2177 for i in range(_lib.sk_X509_REVOKED_num(revoked_stack)): 2178 revoked = _lib.sk_X509_REVOKED_value(revoked_stack, i) 2179 revoked_copy = _lib.Cryptography_X509_REVOKED_dup(revoked) 2180 pyrev = Revoked.__new__(Revoked) 2181 pyrev._revoked = _ffi.gc(revoked_copy, _lib.X509_REVOKED_free) 2182 results.append(pyrev) 2183 if results: 2184 return tuple(results) 2185 2186 def add_revoked(self, revoked): 2187 """ 2188 Add a revoked (by value not reference) to the CRL structure 2189 2190 This revocation will be added by value, not by reference. That 2191 means it's okay to mutate it after adding: it won't affect 2192 this CRL. 2193 2194 :param Revoked revoked: The new revocation. 2195 :return: ``None`` 2196 """ 2197 copy = _lib.Cryptography_X509_REVOKED_dup(revoked._revoked) 2198 _openssl_assert(copy != _ffi.NULL) 2199 2200 add_result = _lib.X509_CRL_add0_revoked(self._crl, copy) 2201 _openssl_assert(add_result != 0) 2202 2203 def get_issuer(self): 2204 """ 2205 Get the CRL's issuer. 2206 2207 .. versionadded:: 16.1.0 2208 2209 :rtype: X509Name 2210 """ 2211 _issuer = _lib.X509_NAME_dup(_lib.X509_CRL_get_issuer(self._crl)) 2212 _openssl_assert(_issuer != _ffi.NULL) 2213 _issuer = _ffi.gc(_issuer, _lib.X509_NAME_free) 2214 issuer = X509Name.__new__(X509Name) 2215 issuer._name = _issuer 2216 return issuer 2217 2218 def set_version(self, version): 2219 """ 2220 Set the CRL version. 2221 2222 .. versionadded:: 16.1.0 2223 2224 :param int version: The version of the CRL. 2225 :return: ``None`` 2226 """ 2227 _openssl_assert(_lib.X509_CRL_set_version(self._crl, version) != 0) 2228 2229 def _set_boundary_time(self, which, when): 2230 return _set_asn1_time(which(self._crl), when) 2231 2232 def set_lastUpdate(self, when): 2233 """ 2234 Set when the CRL was last updated. 2235 2236 The timestamp is formatted as an ASN.1 TIME:: 2237 2238 YYYYMMDDhhmmssZ 2239 2240 .. versionadded:: 16.1.0 2241 2242 :param bytes when: A timestamp string. 2243 :return: ``None`` 2244 """ 2245 return self._set_boundary_time(_lib.X509_CRL_get_lastUpdate, when) 2246 2247 def set_nextUpdate(self, when): 2248 """ 2249 Set when the CRL will next be udpated. 2250 2251 The timestamp is formatted as an ASN.1 TIME:: 2252 2253 YYYYMMDDhhmmssZ 2254 2255 .. versionadded:: 16.1.0 2256 2257 :param bytes when: A timestamp string. 2258 :return: ``None`` 2259 """ 2260 return self._set_boundary_time(_lib.X509_CRL_get_nextUpdate, when) 2261 2262 def sign(self, issuer_cert, issuer_key, digest): 2263 """ 2264 Sign the CRL. 2265 2266 Signing a CRL enables clients to associate the CRL itself with an 2267 issuer. Before a CRL is meaningful to other OpenSSL functions, it must 2268 be signed by an issuer. 2269 2270 This method implicitly sets the issuer's name based on the issuer 2271 certificate and private key used to sign the CRL. 2272 2273 .. versionadded:: 16.1.0 2274 2275 :param X509 issuer_cert: The issuer's certificate. 2276 :param PKey issuer_key: The issuer's private key. 2277 :param bytes digest: The digest method to sign the CRL with. 2278 """ 2279 digest_obj = _lib.EVP_get_digestbyname(digest) 2280 _openssl_assert(digest_obj != _ffi.NULL) 2281 _lib.X509_CRL_set_issuer_name( 2282 self._crl, _lib.X509_get_subject_name(issuer_cert._x509)) 2283 _lib.X509_CRL_sort(self._crl) 2284 result = _lib.X509_CRL_sign(self._crl, issuer_key._pkey, digest_obj) 2285 _openssl_assert(result != 0) 2286 2287 def export(self, cert, key, type=FILETYPE_PEM, days=100, 2288 digest=_UNSPECIFIED): 2289 """ 2290 Export the CRL as a string. 2291 2292 :param X509 cert: The certificate used to sign the CRL. 2293 :param PKey key: The key used to sign the CRL. 2294 :param int type: The export format, either :data:`FILETYPE_PEM`, 2295 :data:`FILETYPE_ASN1`, or :data:`FILETYPE_TEXT`. 2296 :param int days: The number of days until the next update of this CRL. 2297 :param bytes digest: The name of the message digest to use (eg 2298 ``b"sha2566"``). 2299 :rtype: bytes 2300 """ 2301 2302 if not isinstance(cert, X509): 2303 raise TypeError("cert must be an X509 instance") 2304 if not isinstance(key, PKey): 2305 raise TypeError("key must be a PKey instance") 2306 if not isinstance(type, int): 2307 raise TypeError("type must be an integer") 2308 2309 if digest is _UNSPECIFIED: 2310 raise TypeError("digest must be provided") 2311 2312 digest_obj = _lib.EVP_get_digestbyname(digest) 2313 if digest_obj == _ffi.NULL: 2314 raise ValueError("No such digest method") 2315 2316 bio = _lib.BIO_new(_lib.BIO_s_mem()) 2317 _openssl_assert(bio != _ffi.NULL) 2318 2319 # A scratch time object to give different values to different CRL 2320 # fields 2321 sometime = _lib.ASN1_TIME_new() 2322 _openssl_assert(sometime != _ffi.NULL) 2323 2324 _lib.X509_gmtime_adj(sometime, 0) 2325 _lib.X509_CRL_set_lastUpdate(self._crl, sometime) 2326 2327 _lib.X509_gmtime_adj(sometime, days * 24 * 60 * 60) 2328 _lib.X509_CRL_set_nextUpdate(self._crl, sometime) 2329 2330 _lib.X509_CRL_set_issuer_name( 2331 self._crl, _lib.X509_get_subject_name(cert._x509) 2332 ) 2333 2334 sign_result = _lib.X509_CRL_sign(self._crl, key._pkey, digest_obj) 2335 if not sign_result: 2336 _raise_current_error() 2337 2338 return dump_crl(type, self) 2339 2340 2341CRLType = deprecated( 2342 CRL, __name__, 2343 "CRLType has been deprecated, use CRL instead", 2344 DeprecationWarning 2345) 2346 2347 2348class PKCS7(object): 2349 def type_is_signed(self): 2350 """ 2351 Check if this NID_pkcs7_signed object 2352 2353 :return: True if the PKCS7 is of type signed 2354 """ 2355 return bool(_lib.PKCS7_type_is_signed(self._pkcs7)) 2356 2357 def type_is_enveloped(self): 2358 """ 2359 Check if this NID_pkcs7_enveloped object 2360 2361 :returns: True if the PKCS7 is of type enveloped 2362 """ 2363 return bool(_lib.PKCS7_type_is_enveloped(self._pkcs7)) 2364 2365 def type_is_signedAndEnveloped(self): 2366 """ 2367 Check if this NID_pkcs7_signedAndEnveloped object 2368 2369 :returns: True if the PKCS7 is of type signedAndEnveloped 2370 """ 2371 return bool(_lib.PKCS7_type_is_signedAndEnveloped(self._pkcs7)) 2372 2373 def type_is_data(self): 2374 """ 2375 Check if this NID_pkcs7_data object 2376 2377 :return: True if the PKCS7 is of type data 2378 """ 2379 return bool(_lib.PKCS7_type_is_data(self._pkcs7)) 2380 2381 def get_type_name(self): 2382 """ 2383 Returns the type name of the PKCS7 structure 2384 2385 :return: A string with the typename 2386 """ 2387 nid = _lib.OBJ_obj2nid(self._pkcs7.type) 2388 string_type = _lib.OBJ_nid2sn(nid) 2389 return _ffi.string(string_type) 2390 2391 2392PKCS7Type = deprecated( 2393 PKCS7, __name__, 2394 "PKCS7Type has been deprecated, use PKCS7 instead", 2395 DeprecationWarning 2396) 2397 2398 2399class PKCS12(object): 2400 """ 2401 A PKCS #12 archive. 2402 """ 2403 2404 def __init__(self): 2405 self._pkey = None 2406 self._cert = None 2407 self._cacerts = None 2408 self._friendlyname = None 2409 2410 def get_certificate(self): 2411 """ 2412 Get the certificate in the PKCS #12 structure. 2413 2414 :return: The certificate, or :py:const:`None` if there is none. 2415 :rtype: :py:class:`X509` or :py:const:`None` 2416 """ 2417 return self._cert 2418 2419 def set_certificate(self, cert): 2420 """ 2421 Set the certificate in the PKCS #12 structure. 2422 2423 :param cert: The new certificate, or :py:const:`None` to unset it. 2424 :type cert: :py:class:`X509` or :py:const:`None` 2425 2426 :return: ``None`` 2427 """ 2428 if not isinstance(cert, X509): 2429 raise TypeError("cert must be an X509 instance") 2430 self._cert = cert 2431 2432 def get_privatekey(self): 2433 """ 2434 Get the private key in the PKCS #12 structure. 2435 2436 :return: The private key, or :py:const:`None` if there is none. 2437 :rtype: :py:class:`PKey` 2438 """ 2439 return self._pkey 2440 2441 def set_privatekey(self, pkey): 2442 """ 2443 Set the certificate portion of the PKCS #12 structure. 2444 2445 :param pkey: The new private key, or :py:const:`None` to unset it. 2446 :type pkey: :py:class:`PKey` or :py:const:`None` 2447 2448 :return: ``None`` 2449 """ 2450 if not isinstance(pkey, PKey): 2451 raise TypeError("pkey must be a PKey instance") 2452 self._pkey = pkey 2453 2454 def get_ca_certificates(self): 2455 """ 2456 Get the CA certificates in the PKCS #12 structure. 2457 2458 :return: A tuple with the CA certificates in the chain, or 2459 :py:const:`None` if there are none. 2460 :rtype: :py:class:`tuple` of :py:class:`X509` or :py:const:`None` 2461 """ 2462 if self._cacerts is not None: 2463 return tuple(self._cacerts) 2464 2465 def set_ca_certificates(self, cacerts): 2466 """ 2467 Replace or set the CA certificates within the PKCS12 object. 2468 2469 :param cacerts: The new CA certificates, or :py:const:`None` to unset 2470 them. 2471 :type cacerts: An iterable of :py:class:`X509` or :py:const:`None` 2472 2473 :return: ``None`` 2474 """ 2475 if cacerts is None: 2476 self._cacerts = None 2477 else: 2478 cacerts = list(cacerts) 2479 for cert in cacerts: 2480 if not isinstance(cert, X509): 2481 raise TypeError( 2482 "iterable must only contain X509 instances" 2483 ) 2484 self._cacerts = cacerts 2485 2486 def set_friendlyname(self, name): 2487 """ 2488 Set the friendly name in the PKCS #12 structure. 2489 2490 :param name: The new friendly name, or :py:const:`None` to unset. 2491 :type name: :py:class:`bytes` or :py:const:`None` 2492 2493 :return: ``None`` 2494 """ 2495 if name is None: 2496 self._friendlyname = None 2497 elif not isinstance(name, bytes): 2498 raise TypeError( 2499 "name must be a byte string or None (not %r)" % (name,) 2500 ) 2501 self._friendlyname = name 2502 2503 def get_friendlyname(self): 2504 """ 2505 Get the friendly name in the PKCS# 12 structure. 2506 2507 :returns: The friendly name, or :py:const:`None` if there is none. 2508 :rtype: :py:class:`bytes` or :py:const:`None` 2509 """ 2510 return self._friendlyname 2511 2512 def export(self, passphrase=None, iter=2048, maciter=1): 2513 """ 2514 Dump a PKCS12 object as a string. 2515 2516 For more information, see the :c:func:`PKCS12_create` man page. 2517 2518 :param passphrase: The passphrase used to encrypt the structure. Unlike 2519 some other passphrase arguments, this *must* be a string, not a 2520 callback. 2521 :type passphrase: :py:data:`bytes` 2522 2523 :param iter: Number of times to repeat the encryption step. 2524 :type iter: :py:data:`int` 2525 2526 :param maciter: Number of times to repeat the MAC step. 2527 :type maciter: :py:data:`int` 2528 2529 :return: The string representation of the PKCS #12 structure. 2530 :rtype: 2531 """ 2532 passphrase = _text_to_bytes_and_warn("passphrase", passphrase) 2533 2534 if self._cacerts is None: 2535 cacerts = _ffi.NULL 2536 else: 2537 cacerts = _lib.sk_X509_new_null() 2538 cacerts = _ffi.gc(cacerts, _lib.sk_X509_free) 2539 for cert in self._cacerts: 2540 _lib.sk_X509_push(cacerts, cert._x509) 2541 2542 if passphrase is None: 2543 passphrase = _ffi.NULL 2544 2545 friendlyname = self._friendlyname 2546 if friendlyname is None: 2547 friendlyname = _ffi.NULL 2548 2549 if self._pkey is None: 2550 pkey = _ffi.NULL 2551 else: 2552 pkey = self._pkey._pkey 2553 2554 if self._cert is None: 2555 cert = _ffi.NULL 2556 else: 2557 cert = self._cert._x509 2558 2559 pkcs12 = _lib.PKCS12_create( 2560 passphrase, friendlyname, pkey, cert, cacerts, 2561 _lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC, 2562 _lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC, 2563 iter, maciter, 0) 2564 if pkcs12 == _ffi.NULL: 2565 _raise_current_error() 2566 pkcs12 = _ffi.gc(pkcs12, _lib.PKCS12_free) 2567 2568 bio = _new_mem_buf() 2569 _lib.i2d_PKCS12_bio(bio, pkcs12) 2570 return _bio_to_string(bio) 2571 2572 2573PKCS12Type = deprecated( 2574 PKCS12, __name__, 2575 "PKCS12Type has been deprecated, use PKCS12 instead", 2576 DeprecationWarning 2577) 2578 2579 2580class NetscapeSPKI(object): 2581 """ 2582 A Netscape SPKI object. 2583 """ 2584 2585 def __init__(self): 2586 spki = _lib.NETSCAPE_SPKI_new() 2587 self._spki = _ffi.gc(spki, _lib.NETSCAPE_SPKI_free) 2588 2589 def sign(self, pkey, digest): 2590 """ 2591 Sign the certificate request with this key and digest type. 2592 2593 :param pkey: The private key to sign with. 2594 :type pkey: :py:class:`PKey` 2595 2596 :param digest: The message digest to use. 2597 :type digest: :py:class:`bytes` 2598 2599 :return: ``None`` 2600 """ 2601 if pkey._only_public: 2602 raise ValueError("Key has only public part") 2603 2604 if not pkey._initialized: 2605 raise ValueError("Key is uninitialized") 2606 2607 digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) 2608 if digest_obj == _ffi.NULL: 2609 raise ValueError("No such digest method") 2610 2611 sign_result = _lib.NETSCAPE_SPKI_sign( 2612 self._spki, pkey._pkey, digest_obj 2613 ) 2614 _openssl_assert(sign_result > 0) 2615 2616 def verify(self, key): 2617 """ 2618 Verifies a signature on a certificate request. 2619 2620 :param PKey key: The public key that signature is supposedly from. 2621 2622 :return: ``True`` if the signature is correct. 2623 :rtype: bool 2624 2625 :raises OpenSSL.crypto.Error: If the signature is invalid, or there was 2626 a problem verifying the signature. 2627 """ 2628 answer = _lib.NETSCAPE_SPKI_verify(self._spki, key._pkey) 2629 if answer <= 0: 2630 _raise_current_error() 2631 return True 2632 2633 def b64_encode(self): 2634 """ 2635 Generate a base64 encoded representation of this SPKI object. 2636 2637 :return: The base64 encoded string. 2638 :rtype: :py:class:`bytes` 2639 """ 2640 encoded = _lib.NETSCAPE_SPKI_b64_encode(self._spki) 2641 result = _ffi.string(encoded) 2642 _lib.OPENSSL_free(encoded) 2643 return result 2644 2645 def get_pubkey(self): 2646 """ 2647 Get the public key of this certificate. 2648 2649 :return: The public key. 2650 :rtype: :py:class:`PKey` 2651 """ 2652 pkey = PKey.__new__(PKey) 2653 pkey._pkey = _lib.NETSCAPE_SPKI_get_pubkey(self._spki) 2654 _openssl_assert(pkey._pkey != _ffi.NULL) 2655 pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free) 2656 pkey._only_public = True 2657 return pkey 2658 2659 def set_pubkey(self, pkey): 2660 """ 2661 Set the public key of the certificate 2662 2663 :param pkey: The public key 2664 :return: ``None`` 2665 """ 2666 set_result = _lib.NETSCAPE_SPKI_set_pubkey(self._spki, pkey._pkey) 2667 _openssl_assert(set_result == 1) 2668 2669 2670NetscapeSPKIType = deprecated( 2671 NetscapeSPKI, __name__, 2672 "NetscapeSPKIType has been deprecated, use NetscapeSPKI instead", 2673 DeprecationWarning 2674) 2675 2676 2677class _PassphraseHelper(object): 2678 def __init__(self, type, passphrase, more_args=False, truncate=False): 2679 if type != FILETYPE_PEM and passphrase is not None: 2680 raise ValueError( 2681 "only FILETYPE_PEM key format supports encryption" 2682 ) 2683 self._passphrase = passphrase 2684 self._more_args = more_args 2685 self._truncate = truncate 2686 self._problems = [] 2687 2688 @property 2689 def callback(self): 2690 if self._passphrase is None: 2691 return _ffi.NULL 2692 elif isinstance(self._passphrase, bytes): 2693 return _ffi.NULL 2694 elif callable(self._passphrase): 2695 return _ffi.callback("pem_password_cb", self._read_passphrase) 2696 else: 2697 raise TypeError( 2698 "Last argument must be a byte string or a callable." 2699 ) 2700 2701 @property 2702 def callback_args(self): 2703 if self._passphrase is None: 2704 return _ffi.NULL 2705 elif isinstance(self._passphrase, bytes): 2706 return self._passphrase 2707 elif callable(self._passphrase): 2708 return _ffi.NULL 2709 else: 2710 raise TypeError( 2711 "Last argument must be a byte string or a callable." 2712 ) 2713 2714 def raise_if_problem(self, exceptionType=Error): 2715 if self._problems: 2716 2717 # Flush the OpenSSL error queue 2718 try: 2719 _exception_from_error_queue(exceptionType) 2720 except exceptionType: 2721 pass 2722 2723 raise self._problems.pop(0) 2724 2725 def _read_passphrase(self, buf, size, rwflag, userdata): 2726 try: 2727 if self._more_args: 2728 result = self._passphrase(size, rwflag, userdata) 2729 else: 2730 result = self._passphrase(rwflag) 2731 if not isinstance(result, bytes): 2732 raise ValueError("String expected") 2733 if len(result) > size: 2734 if self._truncate: 2735 result = result[:size] 2736 else: 2737 raise ValueError( 2738 "passphrase returned by callback is too long" 2739 ) 2740 for i in range(len(result)): 2741 buf[i] = result[i:i + 1] 2742 return len(result) 2743 except Exception as e: 2744 self._problems.append(e) 2745 return 0 2746 2747 2748def load_publickey(type, buffer): 2749 """ 2750 Load a public key from a buffer. 2751 2752 :param type: The file type (one of :data:`FILETYPE_PEM`, 2753 :data:`FILETYPE_ASN1`). 2754 :param buffer: The buffer the key is stored in. 2755 :type buffer: A Python string object, either unicode or bytestring. 2756 :return: The PKey object. 2757 :rtype: :class:`PKey` 2758 """ 2759 if isinstance(buffer, _text_type): 2760 buffer = buffer.encode("ascii") 2761 2762 bio = _new_mem_buf(buffer) 2763 2764 if type == FILETYPE_PEM: 2765 evp_pkey = _lib.PEM_read_bio_PUBKEY( 2766 bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) 2767 elif type == FILETYPE_ASN1: 2768 evp_pkey = _lib.d2i_PUBKEY_bio(bio, _ffi.NULL) 2769 else: 2770 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 2771 2772 if evp_pkey == _ffi.NULL: 2773 _raise_current_error() 2774 2775 pkey = PKey.__new__(PKey) 2776 pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free) 2777 pkey._only_public = True 2778 return pkey 2779 2780 2781def load_privatekey(type, buffer, passphrase=None): 2782 """ 2783 Load a private key (PKey) from the string *buffer* encoded with the type 2784 *type*. 2785 2786 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 2787 :param buffer: The buffer the key is stored in 2788 :param passphrase: (optional) if encrypted PEM format, this can be 2789 either the passphrase to use, or a callback for 2790 providing the passphrase. 2791 2792 :return: The PKey object 2793 """ 2794 if isinstance(buffer, _text_type): 2795 buffer = buffer.encode("ascii") 2796 2797 bio = _new_mem_buf(buffer) 2798 2799 helper = _PassphraseHelper(type, passphrase) 2800 if type == FILETYPE_PEM: 2801 evp_pkey = _lib.PEM_read_bio_PrivateKey( 2802 bio, _ffi.NULL, helper.callback, helper.callback_args) 2803 helper.raise_if_problem() 2804 elif type == FILETYPE_ASN1: 2805 evp_pkey = _lib.d2i_PrivateKey_bio(bio, _ffi.NULL) 2806 else: 2807 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 2808 2809 if evp_pkey == _ffi.NULL: 2810 _raise_current_error() 2811 2812 pkey = PKey.__new__(PKey) 2813 pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free) 2814 return pkey 2815 2816 2817def dump_certificate_request(type, req): 2818 """ 2819 Dump the certificate request *req* into a buffer string encoded with the 2820 type *type*. 2821 2822 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 2823 :param req: The certificate request to dump 2824 :return: The buffer with the dumped certificate request in 2825 """ 2826 bio = _new_mem_buf() 2827 2828 if type == FILETYPE_PEM: 2829 result_code = _lib.PEM_write_bio_X509_REQ(bio, req._req) 2830 elif type == FILETYPE_ASN1: 2831 result_code = _lib.i2d_X509_REQ_bio(bio, req._req) 2832 elif type == FILETYPE_TEXT: 2833 result_code = _lib.X509_REQ_print_ex(bio, req._req, 0, 0) 2834 else: 2835 raise ValueError( 2836 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " 2837 "FILETYPE_TEXT" 2838 ) 2839 2840 _openssl_assert(result_code != 0) 2841 2842 return _bio_to_string(bio) 2843 2844 2845def load_certificate_request(type, buffer): 2846 """ 2847 Load a certificate request (X509Req) from the string *buffer* encoded with 2848 the type *type*. 2849 2850 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 2851 :param buffer: The buffer the certificate request is stored in 2852 :return: The X509Req object 2853 """ 2854 if isinstance(buffer, _text_type): 2855 buffer = buffer.encode("ascii") 2856 2857 bio = _new_mem_buf(buffer) 2858 2859 if type == FILETYPE_PEM: 2860 req = _lib.PEM_read_bio_X509_REQ(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) 2861 elif type == FILETYPE_ASN1: 2862 req = _lib.d2i_X509_REQ_bio(bio, _ffi.NULL) 2863 else: 2864 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 2865 2866 _openssl_assert(req != _ffi.NULL) 2867 2868 x509req = X509Req.__new__(X509Req) 2869 x509req._req = _ffi.gc(req, _lib.X509_REQ_free) 2870 return x509req 2871 2872 2873def sign(pkey, data, digest): 2874 """ 2875 Sign a data string using the given key and message digest. 2876 2877 :param pkey: PKey to sign with 2878 :param data: data to be signed 2879 :param digest: message digest to use 2880 :return: signature 2881 2882 .. versionadded:: 0.11 2883 """ 2884 data = _text_to_bytes_and_warn("data", data) 2885 2886 digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) 2887 if digest_obj == _ffi.NULL: 2888 raise ValueError("No such digest method") 2889 2890 md_ctx = _lib.Cryptography_EVP_MD_CTX_new() 2891 md_ctx = _ffi.gc(md_ctx, _lib.Cryptography_EVP_MD_CTX_free) 2892 2893 _lib.EVP_SignInit(md_ctx, digest_obj) 2894 _lib.EVP_SignUpdate(md_ctx, data, len(data)) 2895 2896 length = _lib.EVP_PKEY_size(pkey._pkey) 2897 _openssl_assert(length > 0) 2898 signature_buffer = _ffi.new("unsigned char[]", length) 2899 signature_length = _ffi.new("unsigned int *") 2900 final_result = _lib.EVP_SignFinal( 2901 md_ctx, signature_buffer, signature_length, pkey._pkey) 2902 _openssl_assert(final_result == 1) 2903 2904 return _ffi.buffer(signature_buffer, signature_length[0])[:] 2905 2906 2907def verify(cert, signature, data, digest): 2908 """ 2909 Verify the signature for a data string. 2910 2911 :param cert: signing certificate (X509 object) corresponding to the 2912 private key which generated the signature. 2913 :param signature: signature returned by sign function 2914 :param data: data to be verified 2915 :param digest: message digest to use 2916 :return: ``None`` if the signature is correct, raise exception otherwise. 2917 2918 .. versionadded:: 0.11 2919 """ 2920 data = _text_to_bytes_and_warn("data", data) 2921 2922 digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) 2923 if digest_obj == _ffi.NULL: 2924 raise ValueError("No such digest method") 2925 2926 pkey = _lib.X509_get_pubkey(cert._x509) 2927 _openssl_assert(pkey != _ffi.NULL) 2928 pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free) 2929 2930 md_ctx = _lib.Cryptography_EVP_MD_CTX_new() 2931 md_ctx = _ffi.gc(md_ctx, _lib.Cryptography_EVP_MD_CTX_free) 2932 2933 _lib.EVP_VerifyInit(md_ctx, digest_obj) 2934 _lib.EVP_VerifyUpdate(md_ctx, data, len(data)) 2935 verify_result = _lib.EVP_VerifyFinal( 2936 md_ctx, signature, len(signature), pkey 2937 ) 2938 2939 if verify_result != 1: 2940 _raise_current_error() 2941 2942 2943def dump_crl(type, crl): 2944 """ 2945 Dump a certificate revocation list to a buffer. 2946 2947 :param type: The file type (one of ``FILETYPE_PEM``, ``FILETYPE_ASN1``, or 2948 ``FILETYPE_TEXT``). 2949 :param CRL crl: The CRL to dump. 2950 2951 :return: The buffer with the CRL. 2952 :rtype: bytes 2953 """ 2954 bio = _new_mem_buf() 2955 2956 if type == FILETYPE_PEM: 2957 ret = _lib.PEM_write_bio_X509_CRL(bio, crl._crl) 2958 elif type == FILETYPE_ASN1: 2959 ret = _lib.i2d_X509_CRL_bio(bio, crl._crl) 2960 elif type == FILETYPE_TEXT: 2961 ret = _lib.X509_CRL_print(bio, crl._crl) 2962 else: 2963 raise ValueError( 2964 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " 2965 "FILETYPE_TEXT") 2966 2967 assert ret == 1 2968 return _bio_to_string(bio) 2969 2970 2971def load_crl(type, buffer): 2972 """ 2973 Load Certificate Revocation List (CRL) data from a string *buffer*. 2974 *buffer* encoded with the type *type*. 2975 2976 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 2977 :param buffer: The buffer the CRL is stored in 2978 2979 :return: The PKey object 2980 """ 2981 if isinstance(buffer, _text_type): 2982 buffer = buffer.encode("ascii") 2983 2984 bio = _new_mem_buf(buffer) 2985 2986 if type == FILETYPE_PEM: 2987 crl = _lib.PEM_read_bio_X509_CRL(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) 2988 elif type == FILETYPE_ASN1: 2989 crl = _lib.d2i_X509_CRL_bio(bio, _ffi.NULL) 2990 else: 2991 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 2992 2993 if crl == _ffi.NULL: 2994 _raise_current_error() 2995 2996 result = CRL.__new__(CRL) 2997 result._crl = _ffi.gc(crl, _lib.X509_CRL_free) 2998 return result 2999 3000 3001def load_pkcs7_data(type, buffer): 3002 """ 3003 Load pkcs7 data from the string *buffer* encoded with the type 3004 *type*. 3005 3006 :param type: The file type (one of FILETYPE_PEM or FILETYPE_ASN1) 3007 :param buffer: The buffer with the pkcs7 data. 3008 :return: The PKCS7 object 3009 """ 3010 if isinstance(buffer, _text_type): 3011 buffer = buffer.encode("ascii") 3012 3013 bio = _new_mem_buf(buffer) 3014 3015 if type == FILETYPE_PEM: 3016 pkcs7 = _lib.PEM_read_bio_PKCS7(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) 3017 elif type == FILETYPE_ASN1: 3018 pkcs7 = _lib.d2i_PKCS7_bio(bio, _ffi.NULL) 3019 else: 3020 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 3021 3022 if pkcs7 == _ffi.NULL: 3023 _raise_current_error() 3024 3025 pypkcs7 = PKCS7.__new__(PKCS7) 3026 pypkcs7._pkcs7 = _ffi.gc(pkcs7, _lib.PKCS7_free) 3027 return pypkcs7 3028 3029 3030def load_pkcs12(buffer, passphrase=None): 3031 """ 3032 Load pkcs12 data from the string *buffer*. If the pkcs12 structure is 3033 encrypted, a *passphrase* must be included. The MAC is always 3034 checked and thus required. 3035 3036 See also the man page for the C function :py:func:`PKCS12_parse`. 3037 3038 :param buffer: The buffer the certificate is stored in 3039 :param passphrase: (Optional) The password to decrypt the PKCS12 lump 3040 :returns: The PKCS12 object 3041 """ 3042 passphrase = _text_to_bytes_and_warn("passphrase", passphrase) 3043 3044 if isinstance(buffer, _text_type): 3045 buffer = buffer.encode("ascii") 3046 3047 bio = _new_mem_buf(buffer) 3048 3049 # Use null passphrase if passphrase is None or empty string. With PKCS#12 3050 # password based encryption no password and a zero length password are two 3051 # different things, but OpenSSL implementation will try both to figure out 3052 # which one works. 3053 if not passphrase: 3054 passphrase = _ffi.NULL 3055 3056 p12 = _lib.d2i_PKCS12_bio(bio, _ffi.NULL) 3057 if p12 == _ffi.NULL: 3058 _raise_current_error() 3059 p12 = _ffi.gc(p12, _lib.PKCS12_free) 3060 3061 pkey = _ffi.new("EVP_PKEY**") 3062 cert = _ffi.new("X509**") 3063 cacerts = _ffi.new("Cryptography_STACK_OF_X509**") 3064 3065 parse_result = _lib.PKCS12_parse(p12, passphrase, pkey, cert, cacerts) 3066 if not parse_result: 3067 _raise_current_error() 3068 3069 cacerts = _ffi.gc(cacerts[0], _lib.sk_X509_free) 3070 3071 # openssl 1.0.0 sometimes leaves an X509_check_private_key error in the 3072 # queue for no particular reason. This error isn't interesting to anyone 3073 # outside this function. It's not even interesting to us. Get rid of it. 3074 try: 3075 _raise_current_error() 3076 except Error: 3077 pass 3078 3079 if pkey[0] == _ffi.NULL: 3080 pykey = None 3081 else: 3082 pykey = PKey.__new__(PKey) 3083 pykey._pkey = _ffi.gc(pkey[0], _lib.EVP_PKEY_free) 3084 3085 if cert[0] == _ffi.NULL: 3086 pycert = None 3087 friendlyname = None 3088 else: 3089 pycert = X509._from_raw_x509_ptr(cert[0]) 3090 3091 friendlyname_length = _ffi.new("int*") 3092 friendlyname_buffer = _lib.X509_alias_get0( 3093 cert[0], friendlyname_length 3094 ) 3095 friendlyname = _ffi.buffer( 3096 friendlyname_buffer, friendlyname_length[0] 3097 )[:] 3098 if friendlyname_buffer == _ffi.NULL: 3099 friendlyname = None 3100 3101 pycacerts = [] 3102 for i in range(_lib.sk_X509_num(cacerts)): 3103 x509 = _lib.sk_X509_value(cacerts, i) 3104 pycacert = X509._from_raw_x509_ptr(x509) 3105 pycacerts.append(pycacert) 3106 if not pycacerts: 3107 pycacerts = None 3108 3109 pkcs12 = PKCS12.__new__(PKCS12) 3110 pkcs12._pkey = pykey 3111 pkcs12._cert = pycert 3112 pkcs12._cacerts = pycacerts 3113 pkcs12._friendlyname = friendlyname 3114 return pkcs12 3115 3116 3117# There are no direct unit tests for this initialization. It is tested 3118# indirectly since it is necessary for functions like dump_privatekey when 3119# using encryption. 3120# 3121# Thus OpenSSL.test.test_crypto.FunctionTests.test_dump_privatekey_passphrase 3122# and some other similar tests may fail without this (though they may not if 3123# the Python runtime has already done some initialization of the underlying 3124# OpenSSL library (and is linked against the same one that cryptography is 3125# using)). 3126_lib.OpenSSL_add_all_algorithms() 3127 3128# This is similar but exercised mainly by exception_from_error_queue. It calls 3129# both ERR_load_crypto_strings() and ERR_load_SSL_strings(). 3130_lib.SSL_load_error_strings() 3131 3132 3133# Set the default string mask to match OpenSSL upstream (since 2005) and 3134# RFC5280 recommendations. 3135_lib.ASN1_STRING_set_default_mask_asc(b'utf8only') 3136