1import calendar 2import datetime 3 4from base64 import b16encode 5from functools import partial 6from operator import __eq__, __ne__, __lt__, __le__, __gt__, __ge__ 7 8from six import ( 9 integer_types as _integer_types, 10 text_type as _text_type, 11 PY2 as _PY2, 12) 13 14from cryptography import utils, x509 15from cryptography.hazmat.primitives.asymmetric import dsa, rsa 16 17from OpenSSL._util import ( 18 ffi as _ffi, 19 lib as _lib, 20 exception_from_error_queue as _exception_from_error_queue, 21 byte_string as _byte_string, 22 native as _native, 23 path_string as _path_string, 24 UNSPECIFIED as _UNSPECIFIED, 25 text_to_bytes_and_warn as _text_to_bytes_and_warn, 26 make_assert as _make_assert, 27) 28 29__all__ = [ 30 "FILETYPE_PEM", 31 "FILETYPE_ASN1", 32 "FILETYPE_TEXT", 33 "TYPE_RSA", 34 "TYPE_DSA", 35 "Error", 36 "PKey", 37 "get_elliptic_curves", 38 "get_elliptic_curve", 39 "X509Name", 40 "X509Extension", 41 "X509Req", 42 "X509", 43 "X509StoreFlags", 44 "X509Store", 45 "X509StoreContextError", 46 "X509StoreContext", 47 "load_certificate", 48 "dump_certificate", 49 "dump_publickey", 50 "dump_privatekey", 51 "Revoked", 52 "CRL", 53 "PKCS7", 54 "PKCS12", 55 "NetscapeSPKI", 56 "load_publickey", 57 "load_privatekey", 58 "dump_certificate_request", 59 "load_certificate_request", 60 "sign", 61 "verify", 62 "dump_crl", 63 "load_crl", 64 "load_pkcs7_data", 65 "load_pkcs12", 66] 67 68FILETYPE_PEM = _lib.SSL_FILETYPE_PEM 69FILETYPE_ASN1 = _lib.SSL_FILETYPE_ASN1 70 71# TODO This was an API mistake. OpenSSL has no such constant. 72FILETYPE_TEXT = 2 ** 16 - 1 73 74TYPE_RSA = _lib.EVP_PKEY_RSA 75TYPE_DSA = _lib.EVP_PKEY_DSA 76TYPE_DH = _lib.EVP_PKEY_DH 77TYPE_EC = _lib.EVP_PKEY_EC 78 79 80class Error(Exception): 81 """ 82 An error occurred in an `OpenSSL.crypto` API. 83 """ 84 85 86_raise_current_error = partial(_exception_from_error_queue, Error) 87_openssl_assert = _make_assert(Error) 88 89 90def _get_backend(): 91 """ 92 Importing the backend from cryptography has the side effect of activating 93 the osrandom engine. This mutates the global state of OpenSSL in the 94 process and causes issues for various programs that use subinterpreters or 95 embed Python. By putting the import in this function we can avoid 96 triggering this side effect unless _get_backend is called. 97 """ 98 from cryptography.hazmat.backends.openssl.backend import backend 99 100 return backend 101 102 103def _untested_error(where): 104 """ 105 An OpenSSL API failed somehow. Additionally, the failure which was 106 encountered isn't one that's exercised by the test suite so future behavior 107 of pyOpenSSL is now somewhat less predictable. 108 """ 109 raise RuntimeError("Unknown %s failure" % (where,)) 110 111 112def _new_mem_buf(buffer=None): 113 """ 114 Allocate a new OpenSSL memory BIO. 115 116 Arrange for the garbage collector to clean it up automatically. 117 118 :param buffer: None or some bytes to use to put into the BIO so that they 119 can be read out. 120 """ 121 if buffer is None: 122 bio = _lib.BIO_new(_lib.BIO_s_mem()) 123 free = _lib.BIO_free 124 else: 125 data = _ffi.new("char[]", buffer) 126 bio = _lib.BIO_new_mem_buf(data, len(buffer)) 127 128 # Keep the memory alive as long as the bio is alive! 129 def free(bio, ref=data): 130 return _lib.BIO_free(bio) 131 132 _openssl_assert(bio != _ffi.NULL) 133 134 bio = _ffi.gc(bio, free) 135 return bio 136 137 138def _bio_to_string(bio): 139 """ 140 Copy the contents of an OpenSSL BIO object into a Python byte string. 141 """ 142 result_buffer = _ffi.new("char**") 143 buffer_length = _lib.BIO_get_mem_data(bio, result_buffer) 144 return _ffi.buffer(result_buffer[0], buffer_length)[:] 145 146 147def _set_asn1_time(boundary, when): 148 """ 149 The the time value of an ASN1 time object. 150 151 @param boundary: An ASN1_TIME pointer (or an object safely 152 castable to that type) which will have its value set. 153 @param when: A string representation of the desired time value. 154 155 @raise TypeError: If C{when} is not a L{bytes} string. 156 @raise ValueError: If C{when} does not represent a time in the required 157 format. 158 @raise RuntimeError: If the time value cannot be set for some other 159 (unspecified) reason. 160 """ 161 if not isinstance(when, bytes): 162 raise TypeError("when must be a byte string") 163 164 set_result = _lib.ASN1_TIME_set_string(boundary, when) 165 if set_result == 0: 166 raise ValueError("Invalid string") 167 168 169def _get_asn1_time(timestamp): 170 """ 171 Retrieve the time value of an ASN1 time object. 172 173 @param timestamp: An ASN1_GENERALIZEDTIME* (or an object safely castable to 174 that type) from which the time value will be retrieved. 175 176 @return: The time value from C{timestamp} as a L{bytes} string in a certain 177 format. Or C{None} if the object contains no time value. 178 """ 179 string_timestamp = _ffi.cast("ASN1_STRING*", timestamp) 180 if _lib.ASN1_STRING_length(string_timestamp) == 0: 181 return None 182 elif ( 183 _lib.ASN1_STRING_type(string_timestamp) == _lib.V_ASN1_GENERALIZEDTIME 184 ): 185 return _ffi.string(_lib.ASN1_STRING_data(string_timestamp)) 186 else: 187 generalized_timestamp = _ffi.new("ASN1_GENERALIZEDTIME**") 188 _lib.ASN1_TIME_to_generalizedtime(timestamp, generalized_timestamp) 189 if generalized_timestamp[0] == _ffi.NULL: 190 # This may happen: 191 # - if timestamp was not an ASN1_TIME 192 # - if allocating memory for the ASN1_GENERALIZEDTIME failed 193 # - if a copy of the time data from timestamp cannot be made for 194 # the newly allocated ASN1_GENERALIZEDTIME 195 # 196 # These are difficult to test. cffi enforces the ASN1_TIME type. 197 # Memory allocation failures are a pain to trigger 198 # deterministically. 199 _untested_error("ASN1_TIME_to_generalizedtime") 200 else: 201 string_timestamp = _ffi.cast( 202 "ASN1_STRING*", generalized_timestamp[0] 203 ) 204 string_data = _lib.ASN1_STRING_data(string_timestamp) 205 string_result = _ffi.string(string_data) 206 _lib.ASN1_GENERALIZEDTIME_free(generalized_timestamp[0]) 207 return string_result 208 209 210class _X509NameInvalidator(object): 211 def __init__(self): 212 self._names = [] 213 214 def add(self, name): 215 self._names.append(name) 216 217 def clear(self): 218 for name in self._names: 219 # Breaks the object, but also prevents UAF! 220 del name._name 221 222 223class PKey(object): 224 """ 225 A class representing an DSA or RSA public key or key pair. 226 """ 227 228 _only_public = False 229 _initialized = True 230 231 def __init__(self): 232 pkey = _lib.EVP_PKEY_new() 233 self._pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free) 234 self._initialized = False 235 236 def to_cryptography_key(self): 237 """ 238 Export as a ``cryptography`` key. 239 240 :rtype: One of ``cryptography``'s `key interfaces`_. 241 242 .. _key interfaces: https://cryptography.io/en/latest/hazmat/\ 243 primitives/asymmetric/rsa/#key-interfaces 244 245 .. versionadded:: 16.1.0 246 """ 247 backend = _get_backend() 248 if self._only_public: 249 return backend._evp_pkey_to_public_key(self._pkey) 250 else: 251 return backend._evp_pkey_to_private_key(self._pkey) 252 253 @classmethod 254 def from_cryptography_key(cls, crypto_key): 255 """ 256 Construct based on a ``cryptography`` *crypto_key*. 257 258 :param crypto_key: A ``cryptography`` key. 259 :type crypto_key: One of ``cryptography``'s `key interfaces`_. 260 261 :rtype: PKey 262 263 .. versionadded:: 16.1.0 264 """ 265 pkey = cls() 266 if not isinstance( 267 crypto_key, 268 ( 269 rsa.RSAPublicKey, 270 rsa.RSAPrivateKey, 271 dsa.DSAPublicKey, 272 dsa.DSAPrivateKey, 273 ), 274 ): 275 raise TypeError("Unsupported key type") 276 277 pkey._pkey = crypto_key._evp_pkey 278 if isinstance(crypto_key, (rsa.RSAPublicKey, dsa.DSAPublicKey)): 279 pkey._only_public = True 280 pkey._initialized = True 281 return pkey 282 283 def generate_key(self, type, bits): 284 """ 285 Generate a key pair of the given type, with the given number of bits. 286 287 This generates a key "into" the this object. 288 289 :param type: The key type. 290 :type type: :py:data:`TYPE_RSA` or :py:data:`TYPE_DSA` 291 :param bits: The number of bits. 292 :type bits: :py:data:`int` ``>= 0`` 293 :raises TypeError: If :py:data:`type` or :py:data:`bits` isn't 294 of the appropriate type. 295 :raises ValueError: If the number of bits isn't an integer of 296 the appropriate size. 297 :return: ``None`` 298 """ 299 if not isinstance(type, int): 300 raise TypeError("type must be an integer") 301 302 if not isinstance(bits, int): 303 raise TypeError("bits must be an integer") 304 305 if type == TYPE_RSA: 306 if bits <= 0: 307 raise ValueError("Invalid number of bits") 308 309 # TODO Check error return 310 exponent = _lib.BN_new() 311 exponent = _ffi.gc(exponent, _lib.BN_free) 312 _lib.BN_set_word(exponent, _lib.RSA_F4) 313 314 rsa = _lib.RSA_new() 315 316 result = _lib.RSA_generate_key_ex(rsa, bits, exponent, _ffi.NULL) 317 _openssl_assert(result == 1) 318 319 result = _lib.EVP_PKEY_assign_RSA(self._pkey, rsa) 320 _openssl_assert(result == 1) 321 322 elif type == TYPE_DSA: 323 dsa = _lib.DSA_new() 324 _openssl_assert(dsa != _ffi.NULL) 325 326 dsa = _ffi.gc(dsa, _lib.DSA_free) 327 res = _lib.DSA_generate_parameters_ex( 328 dsa, bits, _ffi.NULL, 0, _ffi.NULL, _ffi.NULL, _ffi.NULL 329 ) 330 _openssl_assert(res == 1) 331 332 _openssl_assert(_lib.DSA_generate_key(dsa) == 1) 333 _openssl_assert(_lib.EVP_PKEY_set1_DSA(self._pkey, dsa) == 1) 334 else: 335 raise Error("No such key type") 336 337 self._initialized = True 338 339 def check(self): 340 """ 341 Check the consistency of an RSA private key. 342 343 This is the Python equivalent of OpenSSL's ``RSA_check_key``. 344 345 :return: ``True`` if key is consistent. 346 347 :raise OpenSSL.crypto.Error: if the key is inconsistent. 348 349 :raise TypeError: if the key is of a type which cannot be checked. 350 Only RSA keys can currently be checked. 351 """ 352 if self._only_public: 353 raise TypeError("public key only") 354 355 if _lib.EVP_PKEY_type(self.type()) != _lib.EVP_PKEY_RSA: 356 raise TypeError("key type unsupported") 357 358 rsa = _lib.EVP_PKEY_get1_RSA(self._pkey) 359 rsa = _ffi.gc(rsa, _lib.RSA_free) 360 result = _lib.RSA_check_key(rsa) 361 if result == 1: 362 return True 363 _raise_current_error() 364 365 def type(self): 366 """ 367 Returns the type of the key 368 369 :return: The type of the key. 370 """ 371 return _lib.EVP_PKEY_id(self._pkey) 372 373 def bits(self): 374 """ 375 Returns the number of bits of the key 376 377 :return: The number of bits of the key. 378 """ 379 return _lib.EVP_PKEY_bits(self._pkey) 380 381 382class _EllipticCurve(object): 383 """ 384 A representation of a supported elliptic curve. 385 386 @cvar _curves: :py:obj:`None` until an attempt is made to load the curves. 387 Thereafter, a :py:type:`set` containing :py:type:`_EllipticCurve` 388 instances each of which represents one curve supported by the system. 389 @type _curves: :py:type:`NoneType` or :py:type:`set` 390 """ 391 392 _curves = None 393 394 if not _PY2: 395 # This only necessary on Python 3. Moreover, it is broken on Python 2. 396 def __ne__(self, other): 397 """ 398 Implement cooperation with the right-hand side argument of ``!=``. 399 400 Python 3 seems to have dropped this cooperation in this very narrow 401 circumstance. 402 """ 403 if isinstance(other, _EllipticCurve): 404 return super(_EllipticCurve, self).__ne__(other) 405 return NotImplemented 406 407 @classmethod 408 def _load_elliptic_curves(cls, lib): 409 """ 410 Get the curves supported by OpenSSL. 411 412 :param lib: The OpenSSL library binding object. 413 414 :return: A :py:type:`set` of ``cls`` instances giving the names of the 415 elliptic curves the underlying library supports. 416 """ 417 num_curves = lib.EC_get_builtin_curves(_ffi.NULL, 0) 418 builtin_curves = _ffi.new("EC_builtin_curve[]", num_curves) 419 # The return value on this call should be num_curves again. We 420 # could check it to make sure but if it *isn't* then.. what could 421 # we do? Abort the whole process, I suppose...? -exarkun 422 lib.EC_get_builtin_curves(builtin_curves, num_curves) 423 return set(cls.from_nid(lib, c.nid) for c in builtin_curves) 424 425 @classmethod 426 def _get_elliptic_curves(cls, lib): 427 """ 428 Get, cache, and return the curves supported by OpenSSL. 429 430 :param lib: The OpenSSL library binding object. 431 432 :return: A :py:type:`set` of ``cls`` instances giving the names of the 433 elliptic curves the underlying library supports. 434 """ 435 if cls._curves is None: 436 cls._curves = cls._load_elliptic_curves(lib) 437 return cls._curves 438 439 @classmethod 440 def from_nid(cls, lib, nid): 441 """ 442 Instantiate a new :py:class:`_EllipticCurve` associated with the given 443 OpenSSL NID. 444 445 :param lib: The OpenSSL library binding object. 446 447 :param nid: The OpenSSL NID the resulting curve object will represent. 448 This must be a curve NID (and not, for example, a hash NID) or 449 subsequent operations will fail in unpredictable ways. 450 :type nid: :py:class:`int` 451 452 :return: The curve object. 453 """ 454 return cls(lib, nid, _ffi.string(lib.OBJ_nid2sn(nid)).decode("ascii")) 455 456 def __init__(self, lib, nid, name): 457 """ 458 :param _lib: The :py:mod:`cryptography` binding instance used to 459 interface with OpenSSL. 460 461 :param _nid: The OpenSSL NID identifying the curve this object 462 represents. 463 :type _nid: :py:class:`int` 464 465 :param name: The OpenSSL short name identifying the curve this object 466 represents. 467 :type name: :py:class:`unicode` 468 """ 469 self._lib = lib 470 self._nid = nid 471 self.name = name 472 473 def __repr__(self): 474 return "<Curve %r>" % (self.name,) 475 476 def _to_EC_KEY(self): 477 """ 478 Create a new OpenSSL EC_KEY structure initialized to use this curve. 479 480 The structure is automatically garbage collected when the Python object 481 is garbage collected. 482 """ 483 key = self._lib.EC_KEY_new_by_curve_name(self._nid) 484 return _ffi.gc(key, _lib.EC_KEY_free) 485 486 487def get_elliptic_curves(): 488 """ 489 Return a set of objects representing the elliptic curves supported in the 490 OpenSSL build in use. 491 492 The curve objects have a :py:class:`unicode` ``name`` attribute by which 493 they identify themselves. 494 495 The curve objects are useful as values for the argument accepted by 496 :py:meth:`Context.set_tmp_ecdh` to specify which elliptical curve should be 497 used for ECDHE key exchange. 498 """ 499 return _EllipticCurve._get_elliptic_curves(_lib) 500 501 502def get_elliptic_curve(name): 503 """ 504 Return a single curve object selected by name. 505 506 See :py:func:`get_elliptic_curves` for information about curve objects. 507 508 :param name: The OpenSSL short name identifying the curve object to 509 retrieve. 510 :type name: :py:class:`unicode` 511 512 If the named curve is not supported then :py:class:`ValueError` is raised. 513 """ 514 for curve in get_elliptic_curves(): 515 if curve.name == name: 516 return curve 517 raise ValueError("unknown curve name", name) 518 519 520class X509Name(object): 521 """ 522 An X.509 Distinguished Name. 523 524 :ivar countryName: The country of the entity. 525 :ivar C: Alias for :py:attr:`countryName`. 526 527 :ivar stateOrProvinceName: The state or province of the entity. 528 :ivar ST: Alias for :py:attr:`stateOrProvinceName`. 529 530 :ivar localityName: The locality of the entity. 531 :ivar L: Alias for :py:attr:`localityName`. 532 533 :ivar organizationName: The organization name of the entity. 534 :ivar O: Alias for :py:attr:`organizationName`. 535 536 :ivar organizationalUnitName: The organizational unit of the entity. 537 :ivar OU: Alias for :py:attr:`organizationalUnitName` 538 539 :ivar commonName: The common name of the entity. 540 :ivar CN: Alias for :py:attr:`commonName`. 541 542 :ivar emailAddress: The e-mail address of the entity. 543 """ 544 545 def __init__(self, name): 546 """ 547 Create a new X509Name, copying the given X509Name instance. 548 549 :param name: The name to copy. 550 :type name: :py:class:`X509Name` 551 """ 552 name = _lib.X509_NAME_dup(name._name) 553 self._name = _ffi.gc(name, _lib.X509_NAME_free) 554 555 def __setattr__(self, name, value): 556 if name.startswith("_"): 557 return super(X509Name, self).__setattr__(name, value) 558 559 # Note: we really do not want str subclasses here, so we do not use 560 # isinstance. 561 if type(name) is not str: 562 raise TypeError( 563 "attribute name must be string, not '%.200s'" 564 % (type(value).__name__,) 565 ) 566 567 nid = _lib.OBJ_txt2nid(_byte_string(name)) 568 if nid == _lib.NID_undef: 569 try: 570 _raise_current_error() 571 except Error: 572 pass 573 raise AttributeError("No such attribute") 574 575 # If there's an old entry for this NID, remove it 576 for i in range(_lib.X509_NAME_entry_count(self._name)): 577 ent = _lib.X509_NAME_get_entry(self._name, i) 578 ent_obj = _lib.X509_NAME_ENTRY_get_object(ent) 579 ent_nid = _lib.OBJ_obj2nid(ent_obj) 580 if nid == ent_nid: 581 ent = _lib.X509_NAME_delete_entry(self._name, i) 582 _lib.X509_NAME_ENTRY_free(ent) 583 break 584 585 if isinstance(value, _text_type): 586 value = value.encode("utf-8") 587 588 add_result = _lib.X509_NAME_add_entry_by_NID( 589 self._name, nid, _lib.MBSTRING_UTF8, value, -1, -1, 0 590 ) 591 if not add_result: 592 _raise_current_error() 593 594 def __getattr__(self, name): 595 """ 596 Find attribute. An X509Name object has the following attributes: 597 countryName (alias C), stateOrProvince (alias ST), locality (alias L), 598 organization (alias O), organizationalUnit (alias OU), commonName 599 (alias CN) and more... 600 """ 601 nid = _lib.OBJ_txt2nid(_byte_string(name)) 602 if nid == _lib.NID_undef: 603 # This is a bit weird. OBJ_txt2nid indicated failure, but it seems 604 # a lower level function, a2d_ASN1_OBJECT, also feels the need to 605 # push something onto the error queue. If we don't clean that up 606 # now, someone else will bump into it later and be quite confused. 607 # See lp#314814. 608 try: 609 _raise_current_error() 610 except Error: 611 pass 612 return super(X509Name, self).__getattr__(name) 613 614 entry_index = _lib.X509_NAME_get_index_by_NID(self._name, nid, -1) 615 if entry_index == -1: 616 return None 617 618 entry = _lib.X509_NAME_get_entry(self._name, entry_index) 619 data = _lib.X509_NAME_ENTRY_get_data(entry) 620 621 result_buffer = _ffi.new("unsigned char**") 622 data_length = _lib.ASN1_STRING_to_UTF8(result_buffer, data) 623 _openssl_assert(data_length >= 0) 624 625 try: 626 result = _ffi.buffer(result_buffer[0], data_length)[:].decode( 627 "utf-8" 628 ) 629 finally: 630 # XXX untested 631 _lib.OPENSSL_free(result_buffer[0]) 632 return result 633 634 def _cmp(op): 635 def f(self, other): 636 if not isinstance(other, X509Name): 637 return NotImplemented 638 result = _lib.X509_NAME_cmp(self._name, other._name) 639 return op(result, 0) 640 641 return f 642 643 __eq__ = _cmp(__eq__) 644 __ne__ = _cmp(__ne__) 645 646 __lt__ = _cmp(__lt__) 647 __le__ = _cmp(__le__) 648 649 __gt__ = _cmp(__gt__) 650 __ge__ = _cmp(__ge__) 651 652 def __repr__(self): 653 """ 654 String representation of an X509Name 655 """ 656 result_buffer = _ffi.new("char[]", 512) 657 format_result = _lib.X509_NAME_oneline( 658 self._name, result_buffer, len(result_buffer) 659 ) 660 _openssl_assert(format_result != _ffi.NULL) 661 662 return "<X509Name object '%s'>" % ( 663 _native(_ffi.string(result_buffer)), 664 ) 665 666 def hash(self): 667 """ 668 Return an integer representation of the first four bytes of the 669 MD5 digest of the DER representation of the name. 670 671 This is the Python equivalent of OpenSSL's ``X509_NAME_hash``. 672 673 :return: The (integer) hash of this name. 674 :rtype: :py:class:`int` 675 """ 676 return _lib.X509_NAME_hash(self._name) 677 678 def der(self): 679 """ 680 Return the DER encoding of this name. 681 682 :return: The DER encoded form of this name. 683 :rtype: :py:class:`bytes` 684 """ 685 result_buffer = _ffi.new("unsigned char**") 686 encode_result = _lib.i2d_X509_NAME(self._name, result_buffer) 687 _openssl_assert(encode_result >= 0) 688 689 string_result = _ffi.buffer(result_buffer[0], encode_result)[:] 690 _lib.OPENSSL_free(result_buffer[0]) 691 return string_result 692 693 def get_components(self): 694 """ 695 Returns the components of this name, as a sequence of 2-tuples. 696 697 :return: The components of this name. 698 :rtype: :py:class:`list` of ``name, value`` tuples. 699 """ 700 result = [] 701 for i in range(_lib.X509_NAME_entry_count(self._name)): 702 ent = _lib.X509_NAME_get_entry(self._name, i) 703 704 fname = _lib.X509_NAME_ENTRY_get_object(ent) 705 fval = _lib.X509_NAME_ENTRY_get_data(ent) 706 707 nid = _lib.OBJ_obj2nid(fname) 708 name = _lib.OBJ_nid2sn(nid) 709 710 # ffi.string does not handle strings containing NULL bytes 711 # (which may have been generated by old, broken software) 712 value = _ffi.buffer( 713 _lib.ASN1_STRING_data(fval), _lib.ASN1_STRING_length(fval) 714 )[:] 715 result.append((_ffi.string(name), value)) 716 717 return result 718 719 720class X509Extension(object): 721 """ 722 An X.509 v3 certificate extension. 723 """ 724 725 def __init__(self, type_name, critical, value, subject=None, issuer=None): 726 """ 727 Initializes an X509 extension. 728 729 :param type_name: The name of the type of extension_ to create. 730 :type type_name: :py:data:`bytes` 731 732 :param bool critical: A flag indicating whether this is a critical 733 extension. 734 735 :param value: The value of the extension. 736 :type value: :py:data:`bytes` 737 738 :param subject: Optional X509 certificate to use as subject. 739 :type subject: :py:class:`X509` 740 741 :param issuer: Optional X509 certificate to use as issuer. 742 :type issuer: :py:class:`X509` 743 744 .. _extension: https://www.openssl.org/docs/manmaster/man5/ 745 x509v3_config.html#STANDARD-EXTENSIONS 746 """ 747 ctx = _ffi.new("X509V3_CTX*") 748 749 # A context is necessary for any extension which uses the r2i 750 # conversion method. That is, X509V3_EXT_nconf may segfault if passed 751 # a NULL ctx. Start off by initializing most of the fields to NULL. 752 _lib.X509V3_set_ctx(ctx, _ffi.NULL, _ffi.NULL, _ffi.NULL, _ffi.NULL, 0) 753 754 # We have no configuration database - but perhaps we should (some 755 # extensions may require it). 756 _lib.X509V3_set_ctx_nodb(ctx) 757 758 # Initialize the subject and issuer, if appropriate. ctx is a local, 759 # and as far as I can tell none of the X509V3_* APIs invoked here steal 760 # any references, so no need to mess with reference counts or 761 # duplicates. 762 if issuer is not None: 763 if not isinstance(issuer, X509): 764 raise TypeError("issuer must be an X509 instance") 765 ctx.issuer_cert = issuer._x509 766 if subject is not None: 767 if not isinstance(subject, X509): 768 raise TypeError("subject must be an X509 instance") 769 ctx.subject_cert = subject._x509 770 771 if critical: 772 # There are other OpenSSL APIs which would let us pass in critical 773 # separately, but they're harder to use, and since value is already 774 # a pile of crappy junk smuggling a ton of utterly important 775 # structured data, what's the point of trying to avoid nasty stuff 776 # with strings? (However, X509V3_EXT_i2d in particular seems like 777 # it would be a better API to invoke. I do not know where to get 778 # the ext_struc it desires for its last parameter, though.) 779 value = b"critical," + value 780 781 extension = _lib.X509V3_EXT_nconf(_ffi.NULL, ctx, type_name, value) 782 if extension == _ffi.NULL: 783 _raise_current_error() 784 self._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free) 785 786 @property 787 def _nid(self): 788 return _lib.OBJ_obj2nid( 789 _lib.X509_EXTENSION_get_object(self._extension) 790 ) 791 792 _prefixes = { 793 _lib.GEN_EMAIL: "email", 794 _lib.GEN_DNS: "DNS", 795 _lib.GEN_URI: "URI", 796 } 797 798 def _subjectAltNameString(self): 799 names = _ffi.cast( 800 "GENERAL_NAMES*", _lib.X509V3_EXT_d2i(self._extension) 801 ) 802 803 names = _ffi.gc(names, _lib.GENERAL_NAMES_free) 804 parts = [] 805 for i in range(_lib.sk_GENERAL_NAME_num(names)): 806 name = _lib.sk_GENERAL_NAME_value(names, i) 807 try: 808 label = self._prefixes[name.type] 809 except KeyError: 810 bio = _new_mem_buf() 811 _lib.GENERAL_NAME_print(bio, name) 812 parts.append(_native(_bio_to_string(bio))) 813 else: 814 value = _native( 815 _ffi.buffer(name.d.ia5.data, name.d.ia5.length)[:] 816 ) 817 parts.append(label + ":" + value) 818 return ", ".join(parts) 819 820 def __str__(self): 821 """ 822 :return: a nice text representation of the extension 823 """ 824 if _lib.NID_subject_alt_name == self._nid: 825 return self._subjectAltNameString() 826 827 bio = _new_mem_buf() 828 print_result = _lib.X509V3_EXT_print(bio, self._extension, 0, 0) 829 _openssl_assert(print_result != 0) 830 831 return _native(_bio_to_string(bio)) 832 833 def get_critical(self): 834 """ 835 Returns the critical field of this X.509 extension. 836 837 :return: The critical field. 838 """ 839 return _lib.X509_EXTENSION_get_critical(self._extension) 840 841 def get_short_name(self): 842 """ 843 Returns the short type name of this X.509 extension. 844 845 The result is a byte string such as :py:const:`b"basicConstraints"`. 846 847 :return: The short type name. 848 :rtype: :py:data:`bytes` 849 850 .. versionadded:: 0.12 851 """ 852 obj = _lib.X509_EXTENSION_get_object(self._extension) 853 nid = _lib.OBJ_obj2nid(obj) 854 return _ffi.string(_lib.OBJ_nid2sn(nid)) 855 856 def get_data(self): 857 """ 858 Returns the data of the X509 extension, encoded as ASN.1. 859 860 :return: The ASN.1 encoded data of this X509 extension. 861 :rtype: :py:data:`bytes` 862 863 .. versionadded:: 0.12 864 """ 865 octet_result = _lib.X509_EXTENSION_get_data(self._extension) 866 string_result = _ffi.cast("ASN1_STRING*", octet_result) 867 char_result = _lib.ASN1_STRING_data(string_result) 868 result_length = _lib.ASN1_STRING_length(string_result) 869 return _ffi.buffer(char_result, result_length)[:] 870 871 872class X509Req(object): 873 """ 874 An X.509 certificate signing requests. 875 """ 876 877 def __init__(self): 878 req = _lib.X509_REQ_new() 879 self._req = _ffi.gc(req, _lib.X509_REQ_free) 880 # Default to version 0. 881 self.set_version(0) 882 883 def to_cryptography(self): 884 """ 885 Export as a ``cryptography`` certificate signing request. 886 887 :rtype: ``cryptography.x509.CertificateSigningRequest`` 888 889 .. versionadded:: 17.1.0 890 """ 891 from cryptography.hazmat.backends.openssl.x509 import ( 892 _CertificateSigningRequest, 893 ) 894 895 backend = _get_backend() 896 return _CertificateSigningRequest(backend, self._req) 897 898 @classmethod 899 def from_cryptography(cls, crypto_req): 900 """ 901 Construct based on a ``cryptography`` *crypto_req*. 902 903 :param crypto_req: A ``cryptography`` X.509 certificate signing request 904 :type crypto_req: ``cryptography.x509.CertificateSigningRequest`` 905 906 :rtype: X509Req 907 908 .. versionadded:: 17.1.0 909 """ 910 if not isinstance(crypto_req, x509.CertificateSigningRequest): 911 raise TypeError("Must be a certificate signing request") 912 913 req = cls() 914 req._req = crypto_req._x509_req 915 return req 916 917 def set_pubkey(self, pkey): 918 """ 919 Set the public key of the certificate signing request. 920 921 :param pkey: The public key to use. 922 :type pkey: :py:class:`PKey` 923 924 :return: ``None`` 925 """ 926 set_result = _lib.X509_REQ_set_pubkey(self._req, pkey._pkey) 927 _openssl_assert(set_result == 1) 928 929 def get_pubkey(self): 930 """ 931 Get the public key of the certificate signing request. 932 933 :return: The public key. 934 :rtype: :py:class:`PKey` 935 """ 936 pkey = PKey.__new__(PKey) 937 pkey._pkey = _lib.X509_REQ_get_pubkey(self._req) 938 _openssl_assert(pkey._pkey != _ffi.NULL) 939 pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free) 940 pkey._only_public = True 941 return pkey 942 943 def set_version(self, version): 944 """ 945 Set the version subfield (RFC 2459, section 4.1.2.1) of the certificate 946 request. 947 948 :param int version: The version number. 949 :return: ``None`` 950 """ 951 set_result = _lib.X509_REQ_set_version(self._req, version) 952 _openssl_assert(set_result == 1) 953 954 def get_version(self): 955 """ 956 Get the version subfield (RFC 2459, section 4.1.2.1) of the certificate 957 request. 958 959 :return: The value of the version subfield. 960 :rtype: :py:class:`int` 961 """ 962 return _lib.X509_REQ_get_version(self._req) 963 964 def get_subject(self): 965 """ 966 Return the subject of this certificate signing request. 967 968 This creates a new :class:`X509Name` that wraps the underlying subject 969 name field on the certificate signing request. Modifying it will modify 970 the underlying signing request, and will have the effect of modifying 971 any other :class:`X509Name` that refers to this subject. 972 973 :return: The subject of this certificate signing request. 974 :rtype: :class:`X509Name` 975 """ 976 name = X509Name.__new__(X509Name) 977 name._name = _lib.X509_REQ_get_subject_name(self._req) 978 _openssl_assert(name._name != _ffi.NULL) 979 980 # The name is owned by the X509Req structure. As long as the X509Name 981 # Python object is alive, keep the X509Req Python object alive. 982 name._owner = self 983 984 return name 985 986 def add_extensions(self, extensions): 987 """ 988 Add extensions to the certificate signing request. 989 990 :param extensions: The X.509 extensions to add. 991 :type extensions: iterable of :py:class:`X509Extension` 992 :return: ``None`` 993 """ 994 stack = _lib.sk_X509_EXTENSION_new_null() 995 _openssl_assert(stack != _ffi.NULL) 996 997 stack = _ffi.gc(stack, _lib.sk_X509_EXTENSION_free) 998 999 for ext in extensions: 1000 if not isinstance(ext, X509Extension): 1001 raise ValueError("One of the elements is not an X509Extension") 1002 1003 # TODO push can fail (here and elsewhere) 1004 _lib.sk_X509_EXTENSION_push(stack, ext._extension) 1005 1006 add_result = _lib.X509_REQ_add_extensions(self._req, stack) 1007 _openssl_assert(add_result == 1) 1008 1009 def get_extensions(self): 1010 """ 1011 Get X.509 extensions in the certificate signing request. 1012 1013 :return: The X.509 extensions in this request. 1014 :rtype: :py:class:`list` of :py:class:`X509Extension` objects. 1015 1016 .. versionadded:: 0.15 1017 """ 1018 exts = [] 1019 native_exts_obj = _lib.X509_REQ_get_extensions(self._req) 1020 native_exts_obj = _ffi.gc( 1021 native_exts_obj, 1022 lambda x: _lib.sk_X509_EXTENSION_pop_free( 1023 x, 1024 _ffi.addressof(_lib._original_lib, "X509_EXTENSION_free"), 1025 ), 1026 ) 1027 1028 for i in range(_lib.sk_X509_EXTENSION_num(native_exts_obj)): 1029 ext = X509Extension.__new__(X509Extension) 1030 extension = _lib.X509_EXTENSION_dup( 1031 _lib.sk_X509_EXTENSION_value(native_exts_obj, i) 1032 ) 1033 ext._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free) 1034 exts.append(ext) 1035 return exts 1036 1037 def sign(self, pkey, digest): 1038 """ 1039 Sign the certificate signing request with this key and digest type. 1040 1041 :param pkey: The key pair to sign with. 1042 :type pkey: :py:class:`PKey` 1043 :param digest: The name of the message digest to use for the signature, 1044 e.g. :py:data:`b"sha256"`. 1045 :type digest: :py:class:`bytes` 1046 :return: ``None`` 1047 """ 1048 if pkey._only_public: 1049 raise ValueError("Key has only public part") 1050 1051 if not pkey._initialized: 1052 raise ValueError("Key is uninitialized") 1053 1054 digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) 1055 if digest_obj == _ffi.NULL: 1056 raise ValueError("No such digest method") 1057 1058 sign_result = _lib.X509_REQ_sign(self._req, pkey._pkey, digest_obj) 1059 _openssl_assert(sign_result > 0) 1060 1061 def verify(self, pkey): 1062 """ 1063 Verifies the signature on this certificate signing request. 1064 1065 :param PKey key: A public key. 1066 1067 :return: ``True`` if the signature is correct. 1068 :rtype: bool 1069 1070 :raises OpenSSL.crypto.Error: If the signature is invalid or there is a 1071 problem verifying the signature. 1072 """ 1073 if not isinstance(pkey, PKey): 1074 raise TypeError("pkey must be a PKey instance") 1075 1076 result = _lib.X509_REQ_verify(self._req, pkey._pkey) 1077 if result <= 0: 1078 _raise_current_error() 1079 1080 return result 1081 1082 1083class X509(object): 1084 """ 1085 An X.509 certificate. 1086 """ 1087 1088 def __init__(self): 1089 x509 = _lib.X509_new() 1090 _openssl_assert(x509 != _ffi.NULL) 1091 self._x509 = _ffi.gc(x509, _lib.X509_free) 1092 1093 self._issuer_invalidator = _X509NameInvalidator() 1094 self._subject_invalidator = _X509NameInvalidator() 1095 1096 @classmethod 1097 def _from_raw_x509_ptr(cls, x509): 1098 cert = cls.__new__(cls) 1099 cert._x509 = _ffi.gc(x509, _lib.X509_free) 1100 cert._issuer_invalidator = _X509NameInvalidator() 1101 cert._subject_invalidator = _X509NameInvalidator() 1102 return cert 1103 1104 def to_cryptography(self): 1105 """ 1106 Export as a ``cryptography`` certificate. 1107 1108 :rtype: ``cryptography.x509.Certificate`` 1109 1110 .. versionadded:: 17.1.0 1111 """ 1112 from cryptography.hazmat.backends.openssl.x509 import _Certificate 1113 1114 backend = _get_backend() 1115 return _Certificate(backend, self._x509) 1116 1117 @classmethod 1118 def from_cryptography(cls, crypto_cert): 1119 """ 1120 Construct based on a ``cryptography`` *crypto_cert*. 1121 1122 :param crypto_key: A ``cryptography`` X.509 certificate. 1123 :type crypto_key: ``cryptography.x509.Certificate`` 1124 1125 :rtype: X509 1126 1127 .. versionadded:: 17.1.0 1128 """ 1129 if not isinstance(crypto_cert, x509.Certificate): 1130 raise TypeError("Must be a certificate") 1131 1132 cert = cls() 1133 cert._x509 = crypto_cert._x509 1134 return cert 1135 1136 def set_version(self, version): 1137 """ 1138 Set the version number of the certificate. Note that the 1139 version value is zero-based, eg. a value of 0 is V1. 1140 1141 :param version: The version number of the certificate. 1142 :type version: :py:class:`int` 1143 1144 :return: ``None`` 1145 """ 1146 if not isinstance(version, int): 1147 raise TypeError("version must be an integer") 1148 1149 _lib.X509_set_version(self._x509, version) 1150 1151 def get_version(self): 1152 """ 1153 Return the version number of the certificate. 1154 1155 :return: The version number of the certificate. 1156 :rtype: :py:class:`int` 1157 """ 1158 return _lib.X509_get_version(self._x509) 1159 1160 def get_pubkey(self): 1161 """ 1162 Get the public key of the certificate. 1163 1164 :return: The public key. 1165 :rtype: :py:class:`PKey` 1166 """ 1167 pkey = PKey.__new__(PKey) 1168 pkey._pkey = _lib.X509_get_pubkey(self._x509) 1169 if pkey._pkey == _ffi.NULL: 1170 _raise_current_error() 1171 pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free) 1172 pkey._only_public = True 1173 return pkey 1174 1175 def set_pubkey(self, pkey): 1176 """ 1177 Set the public key of the certificate. 1178 1179 :param pkey: The public key. 1180 :type pkey: :py:class:`PKey` 1181 1182 :return: :py:data:`None` 1183 """ 1184 if not isinstance(pkey, PKey): 1185 raise TypeError("pkey must be a PKey instance") 1186 1187 set_result = _lib.X509_set_pubkey(self._x509, pkey._pkey) 1188 _openssl_assert(set_result == 1) 1189 1190 def sign(self, pkey, digest): 1191 """ 1192 Sign the certificate with this key and digest type. 1193 1194 :param pkey: The key to sign with. 1195 :type pkey: :py:class:`PKey` 1196 1197 :param digest: The name of the message digest to use. 1198 :type digest: :py:class:`bytes` 1199 1200 :return: :py:data:`None` 1201 """ 1202 if not isinstance(pkey, PKey): 1203 raise TypeError("pkey must be a PKey instance") 1204 1205 if pkey._only_public: 1206 raise ValueError("Key only has public part") 1207 1208 if not pkey._initialized: 1209 raise ValueError("Key is uninitialized") 1210 1211 evp_md = _lib.EVP_get_digestbyname(_byte_string(digest)) 1212 if evp_md == _ffi.NULL: 1213 raise ValueError("No such digest method") 1214 1215 sign_result = _lib.X509_sign(self._x509, pkey._pkey, evp_md) 1216 _openssl_assert(sign_result > 0) 1217 1218 def get_signature_algorithm(self): 1219 """ 1220 Return the signature algorithm used in the certificate. 1221 1222 :return: The name of the algorithm. 1223 :rtype: :py:class:`bytes` 1224 1225 :raises ValueError: If the signature algorithm is undefined. 1226 1227 .. versionadded:: 0.13 1228 """ 1229 algor = _lib.X509_get0_tbs_sigalg(self._x509) 1230 nid = _lib.OBJ_obj2nid(algor.algorithm) 1231 if nid == _lib.NID_undef: 1232 raise ValueError("Undefined signature algorithm") 1233 return _ffi.string(_lib.OBJ_nid2ln(nid)) 1234 1235 def digest(self, digest_name): 1236 """ 1237 Return the digest of the X509 object. 1238 1239 :param digest_name: The name of the digest algorithm to use. 1240 :type digest_name: :py:class:`bytes` 1241 1242 :return: The digest of the object, formatted as 1243 :py:const:`b":"`-delimited hex pairs. 1244 :rtype: :py:class:`bytes` 1245 """ 1246 digest = _lib.EVP_get_digestbyname(_byte_string(digest_name)) 1247 if digest == _ffi.NULL: 1248 raise ValueError("No such digest method") 1249 1250 result_buffer = _ffi.new("unsigned char[]", _lib.EVP_MAX_MD_SIZE) 1251 result_length = _ffi.new("unsigned int[]", 1) 1252 result_length[0] = len(result_buffer) 1253 1254 digest_result = _lib.X509_digest( 1255 self._x509, digest, result_buffer, result_length 1256 ) 1257 _openssl_assert(digest_result == 1) 1258 1259 return b":".join( 1260 [ 1261 b16encode(ch).upper() 1262 for ch in _ffi.buffer(result_buffer, result_length[0]) 1263 ] 1264 ) 1265 1266 def subject_name_hash(self): 1267 """ 1268 Return the hash of the X509 subject. 1269 1270 :return: The hash of the subject. 1271 :rtype: :py:class:`bytes` 1272 """ 1273 return _lib.X509_subject_name_hash(self._x509) 1274 1275 def set_serial_number(self, serial): 1276 """ 1277 Set the serial number of the certificate. 1278 1279 :param serial: The new serial number. 1280 :type serial: :py:class:`int` 1281 1282 :return: :py:data`None` 1283 """ 1284 if not isinstance(serial, _integer_types): 1285 raise TypeError("serial must be an integer") 1286 1287 hex_serial = hex(serial)[2:] 1288 if not isinstance(hex_serial, bytes): 1289 hex_serial = hex_serial.encode("ascii") 1290 1291 bignum_serial = _ffi.new("BIGNUM**") 1292 1293 # BN_hex2bn stores the result in &bignum. Unless it doesn't feel like 1294 # it. If bignum is still NULL after this call, then the return value 1295 # is actually the result. I hope. -exarkun 1296 small_serial = _lib.BN_hex2bn(bignum_serial, hex_serial) 1297 1298 if bignum_serial[0] == _ffi.NULL: 1299 set_result = _lib.ASN1_INTEGER_set( 1300 _lib.X509_get_serialNumber(self._x509), small_serial 1301 ) 1302 if set_result: 1303 # TODO Not tested 1304 _raise_current_error() 1305 else: 1306 asn1_serial = _lib.BN_to_ASN1_INTEGER(bignum_serial[0], _ffi.NULL) 1307 _lib.BN_free(bignum_serial[0]) 1308 if asn1_serial == _ffi.NULL: 1309 # TODO Not tested 1310 _raise_current_error() 1311 asn1_serial = _ffi.gc(asn1_serial, _lib.ASN1_INTEGER_free) 1312 set_result = _lib.X509_set_serialNumber(self._x509, asn1_serial) 1313 _openssl_assert(set_result == 1) 1314 1315 def get_serial_number(self): 1316 """ 1317 Return the serial number of this certificate. 1318 1319 :return: The serial number. 1320 :rtype: int 1321 """ 1322 asn1_serial = _lib.X509_get_serialNumber(self._x509) 1323 bignum_serial = _lib.ASN1_INTEGER_to_BN(asn1_serial, _ffi.NULL) 1324 try: 1325 hex_serial = _lib.BN_bn2hex(bignum_serial) 1326 try: 1327 hexstring_serial = _ffi.string(hex_serial) 1328 serial = int(hexstring_serial, 16) 1329 return serial 1330 finally: 1331 _lib.OPENSSL_free(hex_serial) 1332 finally: 1333 _lib.BN_free(bignum_serial) 1334 1335 def gmtime_adj_notAfter(self, amount): 1336 """ 1337 Adjust the time stamp on which the certificate stops being valid. 1338 1339 :param int amount: The number of seconds by which to adjust the 1340 timestamp. 1341 :return: ``None`` 1342 """ 1343 if not isinstance(amount, int): 1344 raise TypeError("amount must be an integer") 1345 1346 notAfter = _lib.X509_getm_notAfter(self._x509) 1347 _lib.X509_gmtime_adj(notAfter, amount) 1348 1349 def gmtime_adj_notBefore(self, amount): 1350 """ 1351 Adjust the timestamp on which the certificate starts being valid. 1352 1353 :param amount: The number of seconds by which to adjust the timestamp. 1354 :return: ``None`` 1355 """ 1356 if not isinstance(amount, int): 1357 raise TypeError("amount must be an integer") 1358 1359 notBefore = _lib.X509_getm_notBefore(self._x509) 1360 _lib.X509_gmtime_adj(notBefore, amount) 1361 1362 def has_expired(self): 1363 """ 1364 Check whether the certificate has expired. 1365 1366 :return: ``True`` if the certificate has expired, ``False`` otherwise. 1367 :rtype: bool 1368 """ 1369 time_string = _native(self.get_notAfter()) 1370 not_after = datetime.datetime.strptime(time_string, "%Y%m%d%H%M%SZ") 1371 1372 return not_after < datetime.datetime.utcnow() 1373 1374 def _get_boundary_time(self, which): 1375 return _get_asn1_time(which(self._x509)) 1376 1377 def get_notBefore(self): 1378 """ 1379 Get the timestamp at which the certificate starts being valid. 1380 1381 The timestamp is formatted as an ASN.1 TIME:: 1382 1383 YYYYMMDDhhmmssZ 1384 1385 :return: A timestamp string, or ``None`` if there is none. 1386 :rtype: bytes or NoneType 1387 """ 1388 return self._get_boundary_time(_lib.X509_getm_notBefore) 1389 1390 def _set_boundary_time(self, which, when): 1391 return _set_asn1_time(which(self._x509), when) 1392 1393 def set_notBefore(self, when): 1394 """ 1395 Set the timestamp at which the certificate starts being valid. 1396 1397 The timestamp is formatted as an ASN.1 TIME:: 1398 1399 YYYYMMDDhhmmssZ 1400 1401 :param bytes when: A timestamp string. 1402 :return: ``None`` 1403 """ 1404 return self._set_boundary_time(_lib.X509_getm_notBefore, when) 1405 1406 def get_notAfter(self): 1407 """ 1408 Get the timestamp at which the certificate stops being valid. 1409 1410 The timestamp is formatted as an ASN.1 TIME:: 1411 1412 YYYYMMDDhhmmssZ 1413 1414 :return: A timestamp string, or ``None`` if there is none. 1415 :rtype: bytes or NoneType 1416 """ 1417 return self._get_boundary_time(_lib.X509_getm_notAfter) 1418 1419 def set_notAfter(self, when): 1420 """ 1421 Set the timestamp at which the certificate stops being valid. 1422 1423 The timestamp is formatted as an ASN.1 TIME:: 1424 1425 YYYYMMDDhhmmssZ 1426 1427 :param bytes when: A timestamp string. 1428 :return: ``None`` 1429 """ 1430 return self._set_boundary_time(_lib.X509_getm_notAfter, when) 1431 1432 def _get_name(self, which): 1433 name = X509Name.__new__(X509Name) 1434 name._name = which(self._x509) 1435 _openssl_assert(name._name != _ffi.NULL) 1436 1437 # The name is owned by the X509 structure. As long as the X509Name 1438 # Python object is alive, keep the X509 Python object alive. 1439 name._owner = self 1440 1441 return name 1442 1443 def _set_name(self, which, name): 1444 if not isinstance(name, X509Name): 1445 raise TypeError("name must be an X509Name") 1446 set_result = which(self._x509, name._name) 1447 _openssl_assert(set_result == 1) 1448 1449 def get_issuer(self): 1450 """ 1451 Return the issuer of this certificate. 1452 1453 This creates a new :class:`X509Name` that wraps the underlying issuer 1454 name field on the certificate. Modifying it will modify the underlying 1455 certificate, and will have the effect of modifying any other 1456 :class:`X509Name` that refers to this issuer. 1457 1458 :return: The issuer of this certificate. 1459 :rtype: :class:`X509Name` 1460 """ 1461 name = self._get_name(_lib.X509_get_issuer_name) 1462 self._issuer_invalidator.add(name) 1463 return name 1464 1465 def set_issuer(self, issuer): 1466 """ 1467 Set the issuer of this certificate. 1468 1469 :param issuer: The issuer. 1470 :type issuer: :py:class:`X509Name` 1471 1472 :return: ``None`` 1473 """ 1474 self._set_name(_lib.X509_set_issuer_name, issuer) 1475 self._issuer_invalidator.clear() 1476 1477 def get_subject(self): 1478 """ 1479 Return the subject of this certificate. 1480 1481 This creates a new :class:`X509Name` that wraps the underlying subject 1482 name field on the certificate. Modifying it will modify the underlying 1483 certificate, and will have the effect of modifying any other 1484 :class:`X509Name` that refers to this subject. 1485 1486 :return: The subject of this certificate. 1487 :rtype: :class:`X509Name` 1488 """ 1489 name = self._get_name(_lib.X509_get_subject_name) 1490 self._subject_invalidator.add(name) 1491 return name 1492 1493 def set_subject(self, subject): 1494 """ 1495 Set the subject of this certificate. 1496 1497 :param subject: The subject. 1498 :type subject: :py:class:`X509Name` 1499 1500 :return: ``None`` 1501 """ 1502 self._set_name(_lib.X509_set_subject_name, subject) 1503 self._subject_invalidator.clear() 1504 1505 def get_extension_count(self): 1506 """ 1507 Get the number of extensions on this certificate. 1508 1509 :return: The number of extensions. 1510 :rtype: :py:class:`int` 1511 1512 .. versionadded:: 0.12 1513 """ 1514 return _lib.X509_get_ext_count(self._x509) 1515 1516 def add_extensions(self, extensions): 1517 """ 1518 Add extensions to the certificate. 1519 1520 :param extensions: The extensions to add. 1521 :type extensions: An iterable of :py:class:`X509Extension` objects. 1522 :return: ``None`` 1523 """ 1524 for ext in extensions: 1525 if not isinstance(ext, X509Extension): 1526 raise ValueError("One of the elements is not an X509Extension") 1527 1528 add_result = _lib.X509_add_ext(self._x509, ext._extension, -1) 1529 if not add_result: 1530 _raise_current_error() 1531 1532 def get_extension(self, index): 1533 """ 1534 Get a specific extension of the certificate by index. 1535 1536 Extensions on a certificate are kept in order. The index 1537 parameter selects which extension will be returned. 1538 1539 :param int index: The index of the extension to retrieve. 1540 :return: The extension at the specified index. 1541 :rtype: :py:class:`X509Extension` 1542 :raises IndexError: If the extension index was out of bounds. 1543 1544 .. versionadded:: 0.12 1545 """ 1546 ext = X509Extension.__new__(X509Extension) 1547 ext._extension = _lib.X509_get_ext(self._x509, index) 1548 if ext._extension == _ffi.NULL: 1549 raise IndexError("extension index out of bounds") 1550 1551 extension = _lib.X509_EXTENSION_dup(ext._extension) 1552 ext._extension = _ffi.gc(extension, _lib.X509_EXTENSION_free) 1553 return ext 1554 1555 1556class X509StoreFlags(object): 1557 """ 1558 Flags for X509 verification, used to change the behavior of 1559 :class:`X509Store`. 1560 1561 See `OpenSSL Verification Flags`_ for details. 1562 1563 .. _OpenSSL Verification Flags: 1564 https://www.openssl.org/docs/manmaster/man3/X509_VERIFY_PARAM_set_flags.html 1565 """ 1566 1567 CRL_CHECK = _lib.X509_V_FLAG_CRL_CHECK 1568 CRL_CHECK_ALL = _lib.X509_V_FLAG_CRL_CHECK_ALL 1569 IGNORE_CRITICAL = _lib.X509_V_FLAG_IGNORE_CRITICAL 1570 X509_STRICT = _lib.X509_V_FLAG_X509_STRICT 1571 ALLOW_PROXY_CERTS = _lib.X509_V_FLAG_ALLOW_PROXY_CERTS 1572 POLICY_CHECK = _lib.X509_V_FLAG_POLICY_CHECK 1573 EXPLICIT_POLICY = _lib.X509_V_FLAG_EXPLICIT_POLICY 1574 INHIBIT_MAP = _lib.X509_V_FLAG_INHIBIT_MAP 1575 NOTIFY_POLICY = _lib.X509_V_FLAG_NOTIFY_POLICY 1576 CHECK_SS_SIGNATURE = _lib.X509_V_FLAG_CHECK_SS_SIGNATURE 1577 CB_ISSUER_CHECK = _lib.X509_V_FLAG_CB_ISSUER_CHECK 1578 1579 1580class X509Store(object): 1581 """ 1582 An X.509 store. 1583 1584 An X.509 store is used to describe a context in which to verify a 1585 certificate. A description of a context may include a set of certificates 1586 to trust, a set of certificate revocation lists, verification flags and 1587 more. 1588 1589 An X.509 store, being only a description, cannot be used by itself to 1590 verify a certificate. To carry out the actual verification process, see 1591 :class:`X509StoreContext`. 1592 """ 1593 1594 def __init__(self): 1595 store = _lib.X509_STORE_new() 1596 self._store = _ffi.gc(store, _lib.X509_STORE_free) 1597 1598 def add_cert(self, cert): 1599 """ 1600 Adds a trusted certificate to this store. 1601 1602 Adding a certificate with this method adds this certificate as a 1603 *trusted* certificate. 1604 1605 :param X509 cert: The certificate to add to this store. 1606 1607 :raises TypeError: If the certificate is not an :class:`X509`. 1608 1609 :raises OpenSSL.crypto.Error: If OpenSSL was unhappy with your 1610 certificate. 1611 1612 :return: ``None`` if the certificate was added successfully. 1613 """ 1614 if not isinstance(cert, X509): 1615 raise TypeError() 1616 1617 res = _lib.X509_STORE_add_cert(self._store, cert._x509) 1618 _openssl_assert(res == 1) 1619 1620 def add_crl(self, crl): 1621 """ 1622 Add a certificate revocation list to this store. 1623 1624 The certificate revocation lists added to a store will only be used if 1625 the associated flags are configured to check certificate revocation 1626 lists. 1627 1628 .. versionadded:: 16.1.0 1629 1630 :param CRL crl: The certificate revocation list to add to this store. 1631 :return: ``None`` if the certificate revocation list was added 1632 successfully. 1633 """ 1634 _openssl_assert(_lib.X509_STORE_add_crl(self._store, crl._crl) != 0) 1635 1636 def set_flags(self, flags): 1637 """ 1638 Set verification flags to this store. 1639 1640 Verification flags can be combined by oring them together. 1641 1642 .. note:: 1643 1644 Setting a verification flag sometimes requires clients to add 1645 additional information to the store, otherwise a suitable error will 1646 be raised. 1647 1648 For example, in setting flags to enable CRL checking a 1649 suitable CRL must be added to the store otherwise an error will be 1650 raised. 1651 1652 .. versionadded:: 16.1.0 1653 1654 :param int flags: The verification flags to set on this store. 1655 See :class:`X509StoreFlags` for available constants. 1656 :return: ``None`` if the verification flags were successfully set. 1657 """ 1658 _openssl_assert(_lib.X509_STORE_set_flags(self._store, flags) != 0) 1659 1660 def set_time(self, vfy_time): 1661 """ 1662 Set the time against which the certificates are verified. 1663 1664 Normally the current time is used. 1665 1666 .. note:: 1667 1668 For example, you can determine if a certificate was valid at a given 1669 time. 1670 1671 .. versionadded:: 17.0.0 1672 1673 :param datetime vfy_time: The verification time to set on this store. 1674 :return: ``None`` if the verification time was successfully set. 1675 """ 1676 param = _lib.X509_VERIFY_PARAM_new() 1677 param = _ffi.gc(param, _lib.X509_VERIFY_PARAM_free) 1678 1679 _lib.X509_VERIFY_PARAM_set_time( 1680 param, calendar.timegm(vfy_time.timetuple()) 1681 ) 1682 _openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0) 1683 1684 def load_locations(self, cafile, capath=None): 1685 """ 1686 Let X509Store know where we can find trusted certificates for the 1687 certificate chain. Note that the certificates have to be in PEM 1688 format. 1689 1690 If *capath* is passed, it must be a directory prepared using the 1691 ``c_rehash`` tool included with OpenSSL. Either, but not both, of 1692 *cafile* or *capath* may be ``None``. 1693 1694 .. note:: 1695 1696 Both *cafile* and *capath* may be set simultaneously. 1697 1698 Call this method multiple times to add more than one location. 1699 For example, CA certificates, and certificate revocation list bundles 1700 may be passed in *cafile* in subsequent calls to this method. 1701 1702 .. versionadded:: 20.0 1703 1704 :param cafile: In which file we can find the certificates (``bytes`` or 1705 ``unicode``). 1706 :param capath: In which directory we can find the certificates 1707 (``bytes`` or ``unicode``). 1708 1709 :return: ``None`` if the locations were set successfully. 1710 1711 :raises OpenSSL.crypto.Error: If both *cafile* and *capath* is ``None`` 1712 or the locations could not be set for any reason. 1713 1714 """ 1715 if cafile is None: 1716 cafile = _ffi.NULL 1717 else: 1718 cafile = _path_string(cafile) 1719 1720 if capath is None: 1721 capath = _ffi.NULL 1722 else: 1723 capath = _path_string(capath) 1724 1725 load_result = _lib.X509_STORE_load_locations( 1726 self._store, cafile, capath 1727 ) 1728 if not load_result: 1729 _raise_current_error() 1730 1731 1732class X509StoreContextError(Exception): 1733 """ 1734 An exception raised when an error occurred while verifying a certificate 1735 using `OpenSSL.X509StoreContext.verify_certificate`. 1736 1737 :ivar certificate: The certificate which caused verificate failure. 1738 :type certificate: :class:`X509` 1739 """ 1740 1741 def __init__(self, message, certificate): 1742 super(X509StoreContextError, self).__init__(message) 1743 self.certificate = certificate 1744 1745 1746class X509StoreContext(object): 1747 """ 1748 An X.509 store context. 1749 1750 An X.509 store context is used to carry out the actual verification process 1751 of a certificate in a described context. For describing such a context, see 1752 :class:`X509Store`. 1753 1754 :ivar _store_ctx: The underlying X509_STORE_CTX structure used by this 1755 instance. It is dynamically allocated and automatically garbage 1756 collected. 1757 :ivar _store: See the ``store`` ``__init__`` parameter. 1758 :ivar _cert: See the ``certificate`` ``__init__`` parameter. 1759 :ivar _chain: See the ``chain`` ``__init__`` parameter. 1760 :param X509Store store: The certificates which will be trusted for the 1761 purposes of any verifications. 1762 :param X509 certificate: The certificate to be verified. 1763 :param chain: List of untrusted certificates that may be used for building 1764 the certificate chain. May be ``None``. 1765 :type chain: :class:`list` of :class:`X509` 1766 """ 1767 1768 def __init__(self, store, certificate, chain=None): 1769 store_ctx = _lib.X509_STORE_CTX_new() 1770 self._store_ctx = _ffi.gc(store_ctx, _lib.X509_STORE_CTX_free) 1771 self._store = store 1772 self._cert = certificate 1773 self._chain = self._build_certificate_stack(chain) 1774 # Make the store context available for use after instantiating this 1775 # class by initializing it now. Per testing, subsequent calls to 1776 # :meth:`_init` have no adverse affect. 1777 self._init() 1778 1779 @staticmethod 1780 def _build_certificate_stack(certificates): 1781 def cleanup(s): 1782 # Equivalent to sk_X509_pop_free, but we don't 1783 # currently have a CFFI binding for that available 1784 for i in range(_lib.sk_X509_num(s)): 1785 x = _lib.sk_X509_value(s, i) 1786 _lib.X509_free(x) 1787 _lib.sk_X509_free(s) 1788 1789 if certificates is None or len(certificates) == 0: 1790 return _ffi.NULL 1791 1792 stack = _lib.sk_X509_new_null() 1793 _openssl_assert(stack != _ffi.NULL) 1794 stack = _ffi.gc(stack, cleanup) 1795 1796 for cert in certificates: 1797 if not isinstance(cert, X509): 1798 raise TypeError("One of the elements is not an X509 instance") 1799 1800 _openssl_assert(_lib.X509_up_ref(cert._x509) > 0) 1801 if _lib.sk_X509_push(stack, cert._x509) <= 0: 1802 _lib.X509_free(cert._x509) 1803 _raise_current_error() 1804 1805 return stack 1806 1807 def _init(self): 1808 """ 1809 Set up the store context for a subsequent verification operation. 1810 1811 Calling this method more than once without first calling 1812 :meth:`_cleanup` will leak memory. 1813 """ 1814 ret = _lib.X509_STORE_CTX_init( 1815 self._store_ctx, self._store._store, self._cert._x509, self._chain 1816 ) 1817 if ret <= 0: 1818 _raise_current_error() 1819 1820 def _cleanup(self): 1821 """ 1822 Internally cleans up the store context. 1823 1824 The store context can then be reused with a new call to :meth:`_init`. 1825 """ 1826 _lib.X509_STORE_CTX_cleanup(self._store_ctx) 1827 1828 def _exception_from_context(self): 1829 """ 1830 Convert an OpenSSL native context error failure into a Python 1831 exception. 1832 1833 When a call to native OpenSSL X509_verify_cert fails, additional 1834 information about the failure can be obtained from the store context. 1835 """ 1836 errors = [ 1837 _lib.X509_STORE_CTX_get_error(self._store_ctx), 1838 _lib.X509_STORE_CTX_get_error_depth(self._store_ctx), 1839 _native( 1840 _ffi.string( 1841 _lib.X509_verify_cert_error_string( 1842 _lib.X509_STORE_CTX_get_error(self._store_ctx) 1843 ) 1844 ) 1845 ), 1846 ] 1847 # A context error should always be associated with a certificate, so we 1848 # expect this call to never return :class:`None`. 1849 _x509 = _lib.X509_STORE_CTX_get_current_cert(self._store_ctx) 1850 _cert = _lib.X509_dup(_x509) 1851 pycert = X509._from_raw_x509_ptr(_cert) 1852 return X509StoreContextError(errors, pycert) 1853 1854 def set_store(self, store): 1855 """ 1856 Set the context's X.509 store. 1857 1858 .. versionadded:: 0.15 1859 1860 :param X509Store store: The store description which will be used for 1861 the purposes of any *future* verifications. 1862 """ 1863 self._store = store 1864 1865 def verify_certificate(self): 1866 """ 1867 Verify a certificate in a context. 1868 1869 .. versionadded:: 0.15 1870 1871 :raises X509StoreContextError: If an error occurred when validating a 1872 certificate in the context. Sets ``certificate`` attribute to 1873 indicate which certificate caused the error. 1874 """ 1875 # Always re-initialize the store context in case 1876 # :meth:`verify_certificate` is called multiple times. 1877 # 1878 # :meth:`_init` is called in :meth:`__init__` so _cleanup is called 1879 # before _init to ensure memory is not leaked. 1880 self._cleanup() 1881 self._init() 1882 ret = _lib.X509_verify_cert(self._store_ctx) 1883 self._cleanup() 1884 if ret <= 0: 1885 raise self._exception_from_context() 1886 1887 def get_verified_chain(self): 1888 """ 1889 Verify a certificate in a context and return the complete validated 1890 chain. 1891 1892 :raises X509StoreContextError: If an error occurred when validating a 1893 certificate in the context. Sets ``certificate`` attribute to 1894 indicate which certificate caused the error. 1895 1896 .. versionadded:: 20.0 1897 """ 1898 # Always re-initialize the store context in case 1899 # :meth:`verify_certificate` is called multiple times. 1900 # 1901 # :meth:`_init` is called in :meth:`__init__` so _cleanup is called 1902 # before _init to ensure memory is not leaked. 1903 self._cleanup() 1904 self._init() 1905 ret = _lib.X509_verify_cert(self._store_ctx) 1906 if ret <= 0: 1907 self._cleanup() 1908 raise self._exception_from_context() 1909 1910 # Note: X509_STORE_CTX_get1_chain returns a deep copy of the chain. 1911 cert_stack = _lib.X509_STORE_CTX_get1_chain(self._store_ctx) 1912 _openssl_assert(cert_stack != _ffi.NULL) 1913 1914 result = [] 1915 for i in range(_lib.sk_X509_num(cert_stack)): 1916 cert = _lib.sk_X509_value(cert_stack, i) 1917 _openssl_assert(cert != _ffi.NULL) 1918 pycert = X509._from_raw_x509_ptr(cert) 1919 result.append(pycert) 1920 1921 # Free the stack but not the members which are freed by the X509 class. 1922 _lib.sk_X509_free(cert_stack) 1923 self._cleanup() 1924 return result 1925 1926 1927def load_certificate(type, buffer): 1928 """ 1929 Load a certificate (X509) from the string *buffer* encoded with the 1930 type *type*. 1931 1932 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 1933 1934 :param bytes buffer: The buffer the certificate is stored in 1935 1936 :return: The X509 object 1937 """ 1938 if isinstance(buffer, _text_type): 1939 buffer = buffer.encode("ascii") 1940 1941 bio = _new_mem_buf(buffer) 1942 1943 if type == FILETYPE_PEM: 1944 x509 = _lib.PEM_read_bio_X509(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) 1945 elif type == FILETYPE_ASN1: 1946 x509 = _lib.d2i_X509_bio(bio, _ffi.NULL) 1947 else: 1948 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 1949 1950 if x509 == _ffi.NULL: 1951 _raise_current_error() 1952 1953 return X509._from_raw_x509_ptr(x509) 1954 1955 1956def dump_certificate(type, cert): 1957 """ 1958 Dump the certificate *cert* into a buffer string encoded with the type 1959 *type*. 1960 1961 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1, or 1962 FILETYPE_TEXT) 1963 :param cert: The certificate to dump 1964 :return: The buffer with the dumped certificate in 1965 """ 1966 bio = _new_mem_buf() 1967 1968 if type == FILETYPE_PEM: 1969 result_code = _lib.PEM_write_bio_X509(bio, cert._x509) 1970 elif type == FILETYPE_ASN1: 1971 result_code = _lib.i2d_X509_bio(bio, cert._x509) 1972 elif type == FILETYPE_TEXT: 1973 result_code = _lib.X509_print_ex(bio, cert._x509, 0, 0) 1974 else: 1975 raise ValueError( 1976 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " 1977 "FILETYPE_TEXT" 1978 ) 1979 1980 _openssl_assert(result_code == 1) 1981 return _bio_to_string(bio) 1982 1983 1984def dump_publickey(type, pkey): 1985 """ 1986 Dump a public key to a buffer. 1987 1988 :param type: The file type (one of :data:`FILETYPE_PEM` or 1989 :data:`FILETYPE_ASN1`). 1990 :param PKey pkey: The public key to dump 1991 :return: The buffer with the dumped key in it. 1992 :rtype: bytes 1993 """ 1994 bio = _new_mem_buf() 1995 if type == FILETYPE_PEM: 1996 write_bio = _lib.PEM_write_bio_PUBKEY 1997 elif type == FILETYPE_ASN1: 1998 write_bio = _lib.i2d_PUBKEY_bio 1999 else: 2000 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 2001 2002 result_code = write_bio(bio, pkey._pkey) 2003 if result_code != 1: # pragma: no cover 2004 _raise_current_error() 2005 2006 return _bio_to_string(bio) 2007 2008 2009def dump_privatekey(type, pkey, cipher=None, passphrase=None): 2010 """ 2011 Dump the private key *pkey* into a buffer string encoded with the type 2012 *type*. Optionally (if *type* is :const:`FILETYPE_PEM`) encrypting it 2013 using *cipher* and *passphrase*. 2014 2015 :param type: The file type (one of :const:`FILETYPE_PEM`, 2016 :const:`FILETYPE_ASN1`, or :const:`FILETYPE_TEXT`) 2017 :param PKey pkey: The PKey to dump 2018 :param cipher: (optional) if encrypted PEM format, the cipher to use 2019 :param passphrase: (optional) if encrypted PEM format, this can be either 2020 the passphrase to use, or a callback for providing the passphrase. 2021 2022 :return: The buffer with the dumped key in 2023 :rtype: bytes 2024 """ 2025 bio = _new_mem_buf() 2026 2027 if not isinstance(pkey, PKey): 2028 raise TypeError("pkey must be a PKey") 2029 2030 if cipher is not None: 2031 if passphrase is None: 2032 raise TypeError( 2033 "if a value is given for cipher " 2034 "one must also be given for passphrase" 2035 ) 2036 cipher_obj = _lib.EVP_get_cipherbyname(_byte_string(cipher)) 2037 if cipher_obj == _ffi.NULL: 2038 raise ValueError("Invalid cipher name") 2039 else: 2040 cipher_obj = _ffi.NULL 2041 2042 helper = _PassphraseHelper(type, passphrase) 2043 if type == FILETYPE_PEM: 2044 result_code = _lib.PEM_write_bio_PrivateKey( 2045 bio, 2046 pkey._pkey, 2047 cipher_obj, 2048 _ffi.NULL, 2049 0, 2050 helper.callback, 2051 helper.callback_args, 2052 ) 2053 helper.raise_if_problem() 2054 elif type == FILETYPE_ASN1: 2055 result_code = _lib.i2d_PrivateKey_bio(bio, pkey._pkey) 2056 elif type == FILETYPE_TEXT: 2057 if _lib.EVP_PKEY_id(pkey._pkey) != _lib.EVP_PKEY_RSA: 2058 raise TypeError("Only RSA keys are supported for FILETYPE_TEXT") 2059 2060 rsa = _ffi.gc(_lib.EVP_PKEY_get1_RSA(pkey._pkey), _lib.RSA_free) 2061 result_code = _lib.RSA_print(bio, rsa, 0) 2062 else: 2063 raise ValueError( 2064 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " 2065 "FILETYPE_TEXT" 2066 ) 2067 2068 _openssl_assert(result_code != 0) 2069 2070 return _bio_to_string(bio) 2071 2072 2073class Revoked(object): 2074 """ 2075 A certificate revocation. 2076 """ 2077 2078 # https://www.openssl.org/docs/manmaster/man5/x509v3_config.html#CRL-distribution-points 2079 # which differs from crl_reasons of crypto/x509v3/v3_enum.c that matches 2080 # OCSP_crl_reason_str. We use the latter, just like the command line 2081 # program. 2082 _crl_reasons = [ 2083 b"unspecified", 2084 b"keyCompromise", 2085 b"CACompromise", 2086 b"affiliationChanged", 2087 b"superseded", 2088 b"cessationOfOperation", 2089 b"certificateHold", 2090 # b"removeFromCRL", 2091 ] 2092 2093 def __init__(self): 2094 revoked = _lib.X509_REVOKED_new() 2095 self._revoked = _ffi.gc(revoked, _lib.X509_REVOKED_free) 2096 2097 def set_serial(self, hex_str): 2098 """ 2099 Set the serial number. 2100 2101 The serial number is formatted as a hexadecimal number encoded in 2102 ASCII. 2103 2104 :param bytes hex_str: The new serial number. 2105 2106 :return: ``None`` 2107 """ 2108 bignum_serial = _ffi.gc(_lib.BN_new(), _lib.BN_free) 2109 bignum_ptr = _ffi.new("BIGNUM**") 2110 bignum_ptr[0] = bignum_serial 2111 bn_result = _lib.BN_hex2bn(bignum_ptr, hex_str) 2112 if not bn_result: 2113 raise ValueError("bad hex string") 2114 2115 asn1_serial = _ffi.gc( 2116 _lib.BN_to_ASN1_INTEGER(bignum_serial, _ffi.NULL), 2117 _lib.ASN1_INTEGER_free, 2118 ) 2119 _lib.X509_REVOKED_set_serialNumber(self._revoked, asn1_serial) 2120 2121 def get_serial(self): 2122 """ 2123 Get the serial number. 2124 2125 The serial number is formatted as a hexadecimal number encoded in 2126 ASCII. 2127 2128 :return: The serial number. 2129 :rtype: bytes 2130 """ 2131 bio = _new_mem_buf() 2132 2133 asn1_int = _lib.X509_REVOKED_get0_serialNumber(self._revoked) 2134 _openssl_assert(asn1_int != _ffi.NULL) 2135 result = _lib.i2a_ASN1_INTEGER(bio, asn1_int) 2136 _openssl_assert(result >= 0) 2137 return _bio_to_string(bio) 2138 2139 def _delete_reason(self): 2140 for i in range(_lib.X509_REVOKED_get_ext_count(self._revoked)): 2141 ext = _lib.X509_REVOKED_get_ext(self._revoked, i) 2142 obj = _lib.X509_EXTENSION_get_object(ext) 2143 if _lib.OBJ_obj2nid(obj) == _lib.NID_crl_reason: 2144 _lib.X509_EXTENSION_free(ext) 2145 _lib.X509_REVOKED_delete_ext(self._revoked, i) 2146 break 2147 2148 def set_reason(self, reason): 2149 """ 2150 Set the reason of this revocation. 2151 2152 If :data:`reason` is ``None``, delete the reason instead. 2153 2154 :param reason: The reason string. 2155 :type reason: :class:`bytes` or :class:`NoneType` 2156 2157 :return: ``None`` 2158 2159 .. seealso:: 2160 2161 :meth:`all_reasons`, which gives you a list of all supported 2162 reasons which you might pass to this method. 2163 """ 2164 if reason is None: 2165 self._delete_reason() 2166 elif not isinstance(reason, bytes): 2167 raise TypeError("reason must be None or a byte string") 2168 else: 2169 reason = reason.lower().replace(b" ", b"") 2170 reason_code = [r.lower() for r in self._crl_reasons].index(reason) 2171 2172 new_reason_ext = _lib.ASN1_ENUMERATED_new() 2173 _openssl_assert(new_reason_ext != _ffi.NULL) 2174 new_reason_ext = _ffi.gc(new_reason_ext, _lib.ASN1_ENUMERATED_free) 2175 2176 set_result = _lib.ASN1_ENUMERATED_set(new_reason_ext, reason_code) 2177 _openssl_assert(set_result != _ffi.NULL) 2178 2179 self._delete_reason() 2180 add_result = _lib.X509_REVOKED_add1_ext_i2d( 2181 self._revoked, _lib.NID_crl_reason, new_reason_ext, 0, 0 2182 ) 2183 _openssl_assert(add_result == 1) 2184 2185 def get_reason(self): 2186 """ 2187 Get the reason of this revocation. 2188 2189 :return: The reason, or ``None`` if there is none. 2190 :rtype: bytes or NoneType 2191 2192 .. seealso:: 2193 2194 :meth:`all_reasons`, which gives you a list of all supported 2195 reasons this method might return. 2196 """ 2197 for i in range(_lib.X509_REVOKED_get_ext_count(self._revoked)): 2198 ext = _lib.X509_REVOKED_get_ext(self._revoked, i) 2199 obj = _lib.X509_EXTENSION_get_object(ext) 2200 if _lib.OBJ_obj2nid(obj) == _lib.NID_crl_reason: 2201 bio = _new_mem_buf() 2202 2203 print_result = _lib.X509V3_EXT_print(bio, ext, 0, 0) 2204 if not print_result: 2205 print_result = _lib.M_ASN1_OCTET_STRING_print( 2206 bio, _lib.X509_EXTENSION_get_data(ext) 2207 ) 2208 _openssl_assert(print_result != 0) 2209 2210 return _bio_to_string(bio) 2211 2212 def all_reasons(self): 2213 """ 2214 Return a list of all the supported reason strings. 2215 2216 This list is a copy; modifying it does not change the supported reason 2217 strings. 2218 2219 :return: A list of reason strings. 2220 :rtype: :class:`list` of :class:`bytes` 2221 """ 2222 return self._crl_reasons[:] 2223 2224 def set_rev_date(self, when): 2225 """ 2226 Set the revocation timestamp. 2227 2228 :param bytes when: The timestamp of the revocation, 2229 as ASN.1 TIME. 2230 :return: ``None`` 2231 """ 2232 dt = _lib.X509_REVOKED_get0_revocationDate(self._revoked) 2233 return _set_asn1_time(dt, when) 2234 2235 def get_rev_date(self): 2236 """ 2237 Get the revocation timestamp. 2238 2239 :return: The timestamp of the revocation, as ASN.1 TIME. 2240 :rtype: bytes 2241 """ 2242 dt = _lib.X509_REVOKED_get0_revocationDate(self._revoked) 2243 return _get_asn1_time(dt) 2244 2245 2246class CRL(object): 2247 """ 2248 A certificate revocation list. 2249 """ 2250 2251 def __init__(self): 2252 crl = _lib.X509_CRL_new() 2253 self._crl = _ffi.gc(crl, _lib.X509_CRL_free) 2254 2255 def to_cryptography(self): 2256 """ 2257 Export as a ``cryptography`` CRL. 2258 2259 :rtype: ``cryptography.x509.CertificateRevocationList`` 2260 2261 .. versionadded:: 17.1.0 2262 """ 2263 from cryptography.hazmat.backends.openssl.x509 import ( 2264 _CertificateRevocationList, 2265 ) 2266 2267 backend = _get_backend() 2268 return _CertificateRevocationList(backend, self._crl) 2269 2270 @classmethod 2271 def from_cryptography(cls, crypto_crl): 2272 """ 2273 Construct based on a ``cryptography`` *crypto_crl*. 2274 2275 :param crypto_crl: A ``cryptography`` certificate revocation list 2276 :type crypto_crl: ``cryptography.x509.CertificateRevocationList`` 2277 2278 :rtype: CRL 2279 2280 .. versionadded:: 17.1.0 2281 """ 2282 if not isinstance(crypto_crl, x509.CertificateRevocationList): 2283 raise TypeError("Must be a certificate revocation list") 2284 2285 crl = cls() 2286 crl._crl = crypto_crl._x509_crl 2287 return crl 2288 2289 def get_revoked(self): 2290 """ 2291 Return the revocations in this certificate revocation list. 2292 2293 These revocations will be provided by value, not by reference. 2294 That means it's okay to mutate them: it won't affect this CRL. 2295 2296 :return: The revocations in this CRL. 2297 :rtype: :class:`tuple` of :class:`Revocation` 2298 """ 2299 results = [] 2300 revoked_stack = _lib.X509_CRL_get_REVOKED(self._crl) 2301 for i in range(_lib.sk_X509_REVOKED_num(revoked_stack)): 2302 revoked = _lib.sk_X509_REVOKED_value(revoked_stack, i) 2303 revoked_copy = _lib.Cryptography_X509_REVOKED_dup(revoked) 2304 pyrev = Revoked.__new__(Revoked) 2305 pyrev._revoked = _ffi.gc(revoked_copy, _lib.X509_REVOKED_free) 2306 results.append(pyrev) 2307 if results: 2308 return tuple(results) 2309 2310 def add_revoked(self, revoked): 2311 """ 2312 Add a revoked (by value not reference) to the CRL structure 2313 2314 This revocation will be added by value, not by reference. That 2315 means it's okay to mutate it after adding: it won't affect 2316 this CRL. 2317 2318 :param Revoked revoked: The new revocation. 2319 :return: ``None`` 2320 """ 2321 copy = _lib.Cryptography_X509_REVOKED_dup(revoked._revoked) 2322 _openssl_assert(copy != _ffi.NULL) 2323 2324 add_result = _lib.X509_CRL_add0_revoked(self._crl, copy) 2325 _openssl_assert(add_result != 0) 2326 2327 def get_issuer(self): 2328 """ 2329 Get the CRL's issuer. 2330 2331 .. versionadded:: 16.1.0 2332 2333 :rtype: X509Name 2334 """ 2335 _issuer = _lib.X509_NAME_dup(_lib.X509_CRL_get_issuer(self._crl)) 2336 _openssl_assert(_issuer != _ffi.NULL) 2337 _issuer = _ffi.gc(_issuer, _lib.X509_NAME_free) 2338 issuer = X509Name.__new__(X509Name) 2339 issuer._name = _issuer 2340 return issuer 2341 2342 def set_version(self, version): 2343 """ 2344 Set the CRL version. 2345 2346 .. versionadded:: 16.1.0 2347 2348 :param int version: The version of the CRL. 2349 :return: ``None`` 2350 """ 2351 _openssl_assert(_lib.X509_CRL_set_version(self._crl, version) != 0) 2352 2353 def _set_boundary_time(self, which, when): 2354 return _set_asn1_time(which(self._crl), when) 2355 2356 def set_lastUpdate(self, when): 2357 """ 2358 Set when the CRL was last updated. 2359 2360 The timestamp is formatted as an ASN.1 TIME:: 2361 2362 YYYYMMDDhhmmssZ 2363 2364 .. versionadded:: 16.1.0 2365 2366 :param bytes when: A timestamp string. 2367 :return: ``None`` 2368 """ 2369 return self._set_boundary_time(_lib.X509_CRL_get_lastUpdate, when) 2370 2371 def set_nextUpdate(self, when): 2372 """ 2373 Set when the CRL will next be updated. 2374 2375 The timestamp is formatted as an ASN.1 TIME:: 2376 2377 YYYYMMDDhhmmssZ 2378 2379 .. versionadded:: 16.1.0 2380 2381 :param bytes when: A timestamp string. 2382 :return: ``None`` 2383 """ 2384 return self._set_boundary_time(_lib.X509_CRL_get_nextUpdate, when) 2385 2386 def sign(self, issuer_cert, issuer_key, digest): 2387 """ 2388 Sign the CRL. 2389 2390 Signing a CRL enables clients to associate the CRL itself with an 2391 issuer. Before a CRL is meaningful to other OpenSSL functions, it must 2392 be signed by an issuer. 2393 2394 This method implicitly sets the issuer's name based on the issuer 2395 certificate and private key used to sign the CRL. 2396 2397 .. versionadded:: 16.1.0 2398 2399 :param X509 issuer_cert: The issuer's certificate. 2400 :param PKey issuer_key: The issuer's private key. 2401 :param bytes digest: The digest method to sign the CRL with. 2402 """ 2403 digest_obj = _lib.EVP_get_digestbyname(digest) 2404 _openssl_assert(digest_obj != _ffi.NULL) 2405 _lib.X509_CRL_set_issuer_name( 2406 self._crl, _lib.X509_get_subject_name(issuer_cert._x509) 2407 ) 2408 _lib.X509_CRL_sort(self._crl) 2409 result = _lib.X509_CRL_sign(self._crl, issuer_key._pkey, digest_obj) 2410 _openssl_assert(result != 0) 2411 2412 def export( 2413 self, cert, key, type=FILETYPE_PEM, days=100, digest=_UNSPECIFIED 2414 ): 2415 """ 2416 Export the CRL as a string. 2417 2418 :param X509 cert: The certificate used to sign the CRL. 2419 :param PKey key: The key used to sign the CRL. 2420 :param int type: The export format, either :data:`FILETYPE_PEM`, 2421 :data:`FILETYPE_ASN1`, or :data:`FILETYPE_TEXT`. 2422 :param int days: The number of days until the next update of this CRL. 2423 :param bytes digest: The name of the message digest to use (eg 2424 ``b"sha256"``). 2425 :rtype: bytes 2426 """ 2427 2428 if not isinstance(cert, X509): 2429 raise TypeError("cert must be an X509 instance") 2430 if not isinstance(key, PKey): 2431 raise TypeError("key must be a PKey instance") 2432 if not isinstance(type, int): 2433 raise TypeError("type must be an integer") 2434 2435 if digest is _UNSPECIFIED: 2436 raise TypeError("digest must be provided") 2437 2438 digest_obj = _lib.EVP_get_digestbyname(digest) 2439 if digest_obj == _ffi.NULL: 2440 raise ValueError("No such digest method") 2441 2442 bio = _lib.BIO_new(_lib.BIO_s_mem()) 2443 _openssl_assert(bio != _ffi.NULL) 2444 2445 # A scratch time object to give different values to different CRL 2446 # fields 2447 sometime = _lib.ASN1_TIME_new() 2448 _openssl_assert(sometime != _ffi.NULL) 2449 2450 _lib.X509_gmtime_adj(sometime, 0) 2451 _lib.X509_CRL_set_lastUpdate(self._crl, sometime) 2452 2453 _lib.X509_gmtime_adj(sometime, days * 24 * 60 * 60) 2454 _lib.X509_CRL_set_nextUpdate(self._crl, sometime) 2455 2456 _lib.X509_CRL_set_issuer_name( 2457 self._crl, _lib.X509_get_subject_name(cert._x509) 2458 ) 2459 2460 sign_result = _lib.X509_CRL_sign(self._crl, key._pkey, digest_obj) 2461 if not sign_result: 2462 _raise_current_error() 2463 2464 return dump_crl(type, self) 2465 2466 2467class PKCS7(object): 2468 def type_is_signed(self): 2469 """ 2470 Check if this NID_pkcs7_signed object 2471 2472 :return: True if the PKCS7 is of type signed 2473 """ 2474 return bool(_lib.PKCS7_type_is_signed(self._pkcs7)) 2475 2476 def type_is_enveloped(self): 2477 """ 2478 Check if this NID_pkcs7_enveloped object 2479 2480 :returns: True if the PKCS7 is of type enveloped 2481 """ 2482 return bool(_lib.PKCS7_type_is_enveloped(self._pkcs7)) 2483 2484 def type_is_signedAndEnveloped(self): 2485 """ 2486 Check if this NID_pkcs7_signedAndEnveloped object 2487 2488 :returns: True if the PKCS7 is of type signedAndEnveloped 2489 """ 2490 return bool(_lib.PKCS7_type_is_signedAndEnveloped(self._pkcs7)) 2491 2492 def type_is_data(self): 2493 """ 2494 Check if this NID_pkcs7_data object 2495 2496 :return: True if the PKCS7 is of type data 2497 """ 2498 return bool(_lib.PKCS7_type_is_data(self._pkcs7)) 2499 2500 def get_type_name(self): 2501 """ 2502 Returns the type name of the PKCS7 structure 2503 2504 :return: A string with the typename 2505 """ 2506 nid = _lib.OBJ_obj2nid(self._pkcs7.type) 2507 string_type = _lib.OBJ_nid2sn(nid) 2508 return _ffi.string(string_type) 2509 2510 2511class PKCS12(object): 2512 """ 2513 A PKCS #12 archive. 2514 """ 2515 2516 def __init__(self): 2517 self._pkey = None 2518 self._cert = None 2519 self._cacerts = None 2520 self._friendlyname = None 2521 2522 def get_certificate(self): 2523 """ 2524 Get the certificate in the PKCS #12 structure. 2525 2526 :return: The certificate, or :py:const:`None` if there is none. 2527 :rtype: :py:class:`X509` or :py:const:`None` 2528 """ 2529 return self._cert 2530 2531 def set_certificate(self, cert): 2532 """ 2533 Set the certificate in the PKCS #12 structure. 2534 2535 :param cert: The new certificate, or :py:const:`None` to unset it. 2536 :type cert: :py:class:`X509` or :py:const:`None` 2537 2538 :return: ``None`` 2539 """ 2540 if not isinstance(cert, X509): 2541 raise TypeError("cert must be an X509 instance") 2542 self._cert = cert 2543 2544 def get_privatekey(self): 2545 """ 2546 Get the private key in the PKCS #12 structure. 2547 2548 :return: The private key, or :py:const:`None` if there is none. 2549 :rtype: :py:class:`PKey` 2550 """ 2551 return self._pkey 2552 2553 def set_privatekey(self, pkey): 2554 """ 2555 Set the certificate portion of the PKCS #12 structure. 2556 2557 :param pkey: The new private key, or :py:const:`None` to unset it. 2558 :type pkey: :py:class:`PKey` or :py:const:`None` 2559 2560 :return: ``None`` 2561 """ 2562 if not isinstance(pkey, PKey): 2563 raise TypeError("pkey must be a PKey instance") 2564 self._pkey = pkey 2565 2566 def get_ca_certificates(self): 2567 """ 2568 Get the CA certificates in the PKCS #12 structure. 2569 2570 :return: A tuple with the CA certificates in the chain, or 2571 :py:const:`None` if there are none. 2572 :rtype: :py:class:`tuple` of :py:class:`X509` or :py:const:`None` 2573 """ 2574 if self._cacerts is not None: 2575 return tuple(self._cacerts) 2576 2577 def set_ca_certificates(self, cacerts): 2578 """ 2579 Replace or set the CA certificates within the PKCS12 object. 2580 2581 :param cacerts: The new CA certificates, or :py:const:`None` to unset 2582 them. 2583 :type cacerts: An iterable of :py:class:`X509` or :py:const:`None` 2584 2585 :return: ``None`` 2586 """ 2587 if cacerts is None: 2588 self._cacerts = None 2589 else: 2590 cacerts = list(cacerts) 2591 for cert in cacerts: 2592 if not isinstance(cert, X509): 2593 raise TypeError( 2594 "iterable must only contain X509 instances" 2595 ) 2596 self._cacerts = cacerts 2597 2598 def set_friendlyname(self, name): 2599 """ 2600 Set the friendly name in the PKCS #12 structure. 2601 2602 :param name: The new friendly name, or :py:const:`None` to unset. 2603 :type name: :py:class:`bytes` or :py:const:`None` 2604 2605 :return: ``None`` 2606 """ 2607 if name is None: 2608 self._friendlyname = None 2609 elif not isinstance(name, bytes): 2610 raise TypeError( 2611 "name must be a byte string or None (not %r)" % (name,) 2612 ) 2613 self._friendlyname = name 2614 2615 def get_friendlyname(self): 2616 """ 2617 Get the friendly name in the PKCS# 12 structure. 2618 2619 :returns: The friendly name, or :py:const:`None` if there is none. 2620 :rtype: :py:class:`bytes` or :py:const:`None` 2621 """ 2622 return self._friendlyname 2623 2624 def export(self, passphrase=None, iter=2048, maciter=1): 2625 """ 2626 Dump a PKCS12 object as a string. 2627 2628 For more information, see the :c:func:`PKCS12_create` man page. 2629 2630 :param passphrase: The passphrase used to encrypt the structure. Unlike 2631 some other passphrase arguments, this *must* be a string, not a 2632 callback. 2633 :type passphrase: :py:data:`bytes` 2634 2635 :param iter: Number of times to repeat the encryption step. 2636 :type iter: :py:data:`int` 2637 2638 :param maciter: Number of times to repeat the MAC step. 2639 :type maciter: :py:data:`int` 2640 2641 :return: The string representation of the PKCS #12 structure. 2642 :rtype: 2643 """ 2644 passphrase = _text_to_bytes_and_warn("passphrase", passphrase) 2645 2646 if self._cacerts is None: 2647 cacerts = _ffi.NULL 2648 else: 2649 cacerts = _lib.sk_X509_new_null() 2650 cacerts = _ffi.gc(cacerts, _lib.sk_X509_free) 2651 for cert in self._cacerts: 2652 _lib.sk_X509_push(cacerts, cert._x509) 2653 2654 if passphrase is None: 2655 passphrase = _ffi.NULL 2656 2657 friendlyname = self._friendlyname 2658 if friendlyname is None: 2659 friendlyname = _ffi.NULL 2660 2661 if self._pkey is None: 2662 pkey = _ffi.NULL 2663 else: 2664 pkey = self._pkey._pkey 2665 2666 if self._cert is None: 2667 cert = _ffi.NULL 2668 else: 2669 cert = self._cert._x509 2670 2671 pkcs12 = _lib.PKCS12_create( 2672 passphrase, 2673 friendlyname, 2674 pkey, 2675 cert, 2676 cacerts, 2677 _lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC, 2678 _lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC, 2679 iter, 2680 maciter, 2681 0, 2682 ) 2683 if pkcs12 == _ffi.NULL: 2684 _raise_current_error() 2685 pkcs12 = _ffi.gc(pkcs12, _lib.PKCS12_free) 2686 2687 bio = _new_mem_buf() 2688 _lib.i2d_PKCS12_bio(bio, pkcs12) 2689 return _bio_to_string(bio) 2690 2691 2692class NetscapeSPKI(object): 2693 """ 2694 A Netscape SPKI object. 2695 """ 2696 2697 def __init__(self): 2698 spki = _lib.NETSCAPE_SPKI_new() 2699 self._spki = _ffi.gc(spki, _lib.NETSCAPE_SPKI_free) 2700 2701 def sign(self, pkey, digest): 2702 """ 2703 Sign the certificate request with this key and digest type. 2704 2705 :param pkey: The private key to sign with. 2706 :type pkey: :py:class:`PKey` 2707 2708 :param digest: The message digest to use. 2709 :type digest: :py:class:`bytes` 2710 2711 :return: ``None`` 2712 """ 2713 if pkey._only_public: 2714 raise ValueError("Key has only public part") 2715 2716 if not pkey._initialized: 2717 raise ValueError("Key is uninitialized") 2718 2719 digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) 2720 if digest_obj == _ffi.NULL: 2721 raise ValueError("No such digest method") 2722 2723 sign_result = _lib.NETSCAPE_SPKI_sign( 2724 self._spki, pkey._pkey, digest_obj 2725 ) 2726 _openssl_assert(sign_result > 0) 2727 2728 def verify(self, key): 2729 """ 2730 Verifies a signature on a certificate request. 2731 2732 :param PKey key: The public key that signature is supposedly from. 2733 2734 :return: ``True`` if the signature is correct. 2735 :rtype: bool 2736 2737 :raises OpenSSL.crypto.Error: If the signature is invalid, or there was 2738 a problem verifying the signature. 2739 """ 2740 answer = _lib.NETSCAPE_SPKI_verify(self._spki, key._pkey) 2741 if answer <= 0: 2742 _raise_current_error() 2743 return True 2744 2745 def b64_encode(self): 2746 """ 2747 Generate a base64 encoded representation of this SPKI object. 2748 2749 :return: The base64 encoded string. 2750 :rtype: :py:class:`bytes` 2751 """ 2752 encoded = _lib.NETSCAPE_SPKI_b64_encode(self._spki) 2753 result = _ffi.string(encoded) 2754 _lib.OPENSSL_free(encoded) 2755 return result 2756 2757 def get_pubkey(self): 2758 """ 2759 Get the public key of this certificate. 2760 2761 :return: The public key. 2762 :rtype: :py:class:`PKey` 2763 """ 2764 pkey = PKey.__new__(PKey) 2765 pkey._pkey = _lib.NETSCAPE_SPKI_get_pubkey(self._spki) 2766 _openssl_assert(pkey._pkey != _ffi.NULL) 2767 pkey._pkey = _ffi.gc(pkey._pkey, _lib.EVP_PKEY_free) 2768 pkey._only_public = True 2769 return pkey 2770 2771 def set_pubkey(self, pkey): 2772 """ 2773 Set the public key of the certificate 2774 2775 :param pkey: The public key 2776 :return: ``None`` 2777 """ 2778 set_result = _lib.NETSCAPE_SPKI_set_pubkey(self._spki, pkey._pkey) 2779 _openssl_assert(set_result == 1) 2780 2781 2782class _PassphraseHelper(object): 2783 def __init__(self, type, passphrase, more_args=False, truncate=False): 2784 if type != FILETYPE_PEM and passphrase is not None: 2785 raise ValueError( 2786 "only FILETYPE_PEM key format supports encryption" 2787 ) 2788 self._passphrase = passphrase 2789 self._more_args = more_args 2790 self._truncate = truncate 2791 self._problems = [] 2792 2793 @property 2794 def callback(self): 2795 if self._passphrase is None: 2796 return _ffi.NULL 2797 elif isinstance(self._passphrase, bytes) or callable(self._passphrase): 2798 return _ffi.callback("pem_password_cb", self._read_passphrase) 2799 else: 2800 raise TypeError( 2801 "Last argument must be a byte string or a callable." 2802 ) 2803 2804 @property 2805 def callback_args(self): 2806 if self._passphrase is None: 2807 return _ffi.NULL 2808 elif isinstance(self._passphrase, bytes) or callable(self._passphrase): 2809 return _ffi.NULL 2810 else: 2811 raise TypeError( 2812 "Last argument must be a byte string or a callable." 2813 ) 2814 2815 def raise_if_problem(self, exceptionType=Error): 2816 if self._problems: 2817 2818 # Flush the OpenSSL error queue 2819 try: 2820 _exception_from_error_queue(exceptionType) 2821 except exceptionType: 2822 pass 2823 2824 raise self._problems.pop(0) 2825 2826 def _read_passphrase(self, buf, size, rwflag, userdata): 2827 try: 2828 if callable(self._passphrase): 2829 if self._more_args: 2830 result = self._passphrase(size, rwflag, userdata) 2831 else: 2832 result = self._passphrase(rwflag) 2833 else: 2834 result = self._passphrase 2835 if not isinstance(result, bytes): 2836 raise ValueError("Bytes expected") 2837 if len(result) > size: 2838 if self._truncate: 2839 result = result[:size] 2840 else: 2841 raise ValueError( 2842 "passphrase returned by callback is too long" 2843 ) 2844 for i in range(len(result)): 2845 buf[i] = result[i : i + 1] 2846 return len(result) 2847 except Exception as e: 2848 self._problems.append(e) 2849 return 0 2850 2851 2852def load_publickey(type, buffer): 2853 """ 2854 Load a public key from a buffer. 2855 2856 :param type: The file type (one of :data:`FILETYPE_PEM`, 2857 :data:`FILETYPE_ASN1`). 2858 :param buffer: The buffer the key is stored in. 2859 :type buffer: A Python string object, either unicode or bytestring. 2860 :return: The PKey object. 2861 :rtype: :class:`PKey` 2862 """ 2863 if isinstance(buffer, _text_type): 2864 buffer = buffer.encode("ascii") 2865 2866 bio = _new_mem_buf(buffer) 2867 2868 if type == FILETYPE_PEM: 2869 evp_pkey = _lib.PEM_read_bio_PUBKEY( 2870 bio, _ffi.NULL, _ffi.NULL, _ffi.NULL 2871 ) 2872 elif type == FILETYPE_ASN1: 2873 evp_pkey = _lib.d2i_PUBKEY_bio(bio, _ffi.NULL) 2874 else: 2875 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 2876 2877 if evp_pkey == _ffi.NULL: 2878 _raise_current_error() 2879 2880 pkey = PKey.__new__(PKey) 2881 pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free) 2882 pkey._only_public = True 2883 return pkey 2884 2885 2886def load_privatekey(type, buffer, passphrase=None): 2887 """ 2888 Load a private key (PKey) from the string *buffer* encoded with the type 2889 *type*. 2890 2891 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 2892 :param buffer: The buffer the key is stored in 2893 :param passphrase: (optional) if encrypted PEM format, this can be 2894 either the passphrase to use, or a callback for 2895 providing the passphrase. 2896 2897 :return: The PKey object 2898 """ 2899 if isinstance(buffer, _text_type): 2900 buffer = buffer.encode("ascii") 2901 2902 bio = _new_mem_buf(buffer) 2903 2904 helper = _PassphraseHelper(type, passphrase) 2905 if type == FILETYPE_PEM: 2906 evp_pkey = _lib.PEM_read_bio_PrivateKey( 2907 bio, _ffi.NULL, helper.callback, helper.callback_args 2908 ) 2909 helper.raise_if_problem() 2910 elif type == FILETYPE_ASN1: 2911 evp_pkey = _lib.d2i_PrivateKey_bio(bio, _ffi.NULL) 2912 else: 2913 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 2914 2915 if evp_pkey == _ffi.NULL: 2916 _raise_current_error() 2917 2918 pkey = PKey.__new__(PKey) 2919 pkey._pkey = _ffi.gc(evp_pkey, _lib.EVP_PKEY_free) 2920 return pkey 2921 2922 2923def dump_certificate_request(type, req): 2924 """ 2925 Dump the certificate request *req* into a buffer string encoded with the 2926 type *type*. 2927 2928 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 2929 :param req: The certificate request to dump 2930 :return: The buffer with the dumped certificate request in 2931 """ 2932 bio = _new_mem_buf() 2933 2934 if type == FILETYPE_PEM: 2935 result_code = _lib.PEM_write_bio_X509_REQ(bio, req._req) 2936 elif type == FILETYPE_ASN1: 2937 result_code = _lib.i2d_X509_REQ_bio(bio, req._req) 2938 elif type == FILETYPE_TEXT: 2939 result_code = _lib.X509_REQ_print_ex(bio, req._req, 0, 0) 2940 else: 2941 raise ValueError( 2942 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " 2943 "FILETYPE_TEXT" 2944 ) 2945 2946 _openssl_assert(result_code != 0) 2947 2948 return _bio_to_string(bio) 2949 2950 2951def load_certificate_request(type, buffer): 2952 """ 2953 Load a certificate request (X509Req) from the string *buffer* encoded with 2954 the type *type*. 2955 2956 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 2957 :param buffer: The buffer the certificate request is stored in 2958 :return: The X509Req object 2959 """ 2960 if isinstance(buffer, _text_type): 2961 buffer = buffer.encode("ascii") 2962 2963 bio = _new_mem_buf(buffer) 2964 2965 if type == FILETYPE_PEM: 2966 req = _lib.PEM_read_bio_X509_REQ(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) 2967 elif type == FILETYPE_ASN1: 2968 req = _lib.d2i_X509_REQ_bio(bio, _ffi.NULL) 2969 else: 2970 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 2971 2972 _openssl_assert(req != _ffi.NULL) 2973 2974 x509req = X509Req.__new__(X509Req) 2975 x509req._req = _ffi.gc(req, _lib.X509_REQ_free) 2976 return x509req 2977 2978 2979def sign(pkey, data, digest): 2980 """ 2981 Sign a data string using the given key and message digest. 2982 2983 :param pkey: PKey to sign with 2984 :param data: data to be signed 2985 :param digest: message digest to use 2986 :return: signature 2987 2988 .. versionadded:: 0.11 2989 """ 2990 data = _text_to_bytes_and_warn("data", data) 2991 2992 digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) 2993 if digest_obj == _ffi.NULL: 2994 raise ValueError("No such digest method") 2995 2996 md_ctx = _lib.Cryptography_EVP_MD_CTX_new() 2997 md_ctx = _ffi.gc(md_ctx, _lib.Cryptography_EVP_MD_CTX_free) 2998 2999 _lib.EVP_SignInit(md_ctx, digest_obj) 3000 _lib.EVP_SignUpdate(md_ctx, data, len(data)) 3001 3002 length = _lib.EVP_PKEY_size(pkey._pkey) 3003 _openssl_assert(length > 0) 3004 signature_buffer = _ffi.new("unsigned char[]", length) 3005 signature_length = _ffi.new("unsigned int *") 3006 final_result = _lib.EVP_SignFinal( 3007 md_ctx, signature_buffer, signature_length, pkey._pkey 3008 ) 3009 _openssl_assert(final_result == 1) 3010 3011 return _ffi.buffer(signature_buffer, signature_length[0])[:] 3012 3013 3014def verify(cert, signature, data, digest): 3015 """ 3016 Verify the signature for a data string. 3017 3018 :param cert: signing certificate (X509 object) corresponding to the 3019 private key which generated the signature. 3020 :param signature: signature returned by sign function 3021 :param data: data to be verified 3022 :param digest: message digest to use 3023 :return: ``None`` if the signature is correct, raise exception otherwise. 3024 3025 .. versionadded:: 0.11 3026 """ 3027 data = _text_to_bytes_and_warn("data", data) 3028 3029 digest_obj = _lib.EVP_get_digestbyname(_byte_string(digest)) 3030 if digest_obj == _ffi.NULL: 3031 raise ValueError("No such digest method") 3032 3033 pkey = _lib.X509_get_pubkey(cert._x509) 3034 _openssl_assert(pkey != _ffi.NULL) 3035 pkey = _ffi.gc(pkey, _lib.EVP_PKEY_free) 3036 3037 md_ctx = _lib.Cryptography_EVP_MD_CTX_new() 3038 md_ctx = _ffi.gc(md_ctx, _lib.Cryptography_EVP_MD_CTX_free) 3039 3040 _lib.EVP_VerifyInit(md_ctx, digest_obj) 3041 _lib.EVP_VerifyUpdate(md_ctx, data, len(data)) 3042 verify_result = _lib.EVP_VerifyFinal( 3043 md_ctx, signature, len(signature), pkey 3044 ) 3045 3046 if verify_result != 1: 3047 _raise_current_error() 3048 3049 3050def dump_crl(type, crl): 3051 """ 3052 Dump a certificate revocation list to a buffer. 3053 3054 :param type: The file type (one of ``FILETYPE_PEM``, ``FILETYPE_ASN1``, or 3055 ``FILETYPE_TEXT``). 3056 :param CRL crl: The CRL to dump. 3057 3058 :return: The buffer with the CRL. 3059 :rtype: bytes 3060 """ 3061 bio = _new_mem_buf() 3062 3063 if type == FILETYPE_PEM: 3064 ret = _lib.PEM_write_bio_X509_CRL(bio, crl._crl) 3065 elif type == FILETYPE_ASN1: 3066 ret = _lib.i2d_X509_CRL_bio(bio, crl._crl) 3067 elif type == FILETYPE_TEXT: 3068 ret = _lib.X509_CRL_print(bio, crl._crl) 3069 else: 3070 raise ValueError( 3071 "type argument must be FILETYPE_PEM, FILETYPE_ASN1, or " 3072 "FILETYPE_TEXT" 3073 ) 3074 3075 _openssl_assert(ret == 1) 3076 return _bio_to_string(bio) 3077 3078 3079def load_crl(type, buffer): 3080 """ 3081 Load Certificate Revocation List (CRL) data from a string *buffer*. 3082 *buffer* encoded with the type *type*. 3083 3084 :param type: The file type (one of FILETYPE_PEM, FILETYPE_ASN1) 3085 :param buffer: The buffer the CRL is stored in 3086 3087 :return: The PKey object 3088 """ 3089 if isinstance(buffer, _text_type): 3090 buffer = buffer.encode("ascii") 3091 3092 bio = _new_mem_buf(buffer) 3093 3094 if type == FILETYPE_PEM: 3095 crl = _lib.PEM_read_bio_X509_CRL(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) 3096 elif type == FILETYPE_ASN1: 3097 crl = _lib.d2i_X509_CRL_bio(bio, _ffi.NULL) 3098 else: 3099 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 3100 3101 if crl == _ffi.NULL: 3102 _raise_current_error() 3103 3104 result = CRL.__new__(CRL) 3105 result._crl = _ffi.gc(crl, _lib.X509_CRL_free) 3106 return result 3107 3108 3109def load_pkcs7_data(type, buffer): 3110 """ 3111 Load pkcs7 data from the string *buffer* encoded with the type 3112 *type*. 3113 3114 :param type: The file type (one of FILETYPE_PEM or FILETYPE_ASN1) 3115 :param buffer: The buffer with the pkcs7 data. 3116 :return: The PKCS7 object 3117 """ 3118 if isinstance(buffer, _text_type): 3119 buffer = buffer.encode("ascii") 3120 3121 bio = _new_mem_buf(buffer) 3122 3123 if type == FILETYPE_PEM: 3124 pkcs7 = _lib.PEM_read_bio_PKCS7(bio, _ffi.NULL, _ffi.NULL, _ffi.NULL) 3125 elif type == FILETYPE_ASN1: 3126 pkcs7 = _lib.d2i_PKCS7_bio(bio, _ffi.NULL) 3127 else: 3128 raise ValueError("type argument must be FILETYPE_PEM or FILETYPE_ASN1") 3129 3130 if pkcs7 == _ffi.NULL: 3131 _raise_current_error() 3132 3133 pypkcs7 = PKCS7.__new__(PKCS7) 3134 pypkcs7._pkcs7 = _ffi.gc(pkcs7, _lib.PKCS7_free) 3135 return pypkcs7 3136 3137 3138load_pkcs7_data = utils.deprecated( 3139 load_pkcs7_data, 3140 __name__, 3141 ( 3142 "PKCS#7 support in pyOpenSSL is deprecated. You should use the APIs " 3143 "in cryptography." 3144 ), 3145 DeprecationWarning, 3146) 3147 3148 3149def load_pkcs12(buffer, passphrase=None): 3150 """ 3151 Load pkcs12 data from the string *buffer*. If the pkcs12 structure is 3152 encrypted, a *passphrase* must be included. The MAC is always 3153 checked and thus required. 3154 3155 See also the man page for the C function :py:func:`PKCS12_parse`. 3156 3157 :param buffer: The buffer the certificate is stored in 3158 :param passphrase: (Optional) The password to decrypt the PKCS12 lump 3159 :returns: The PKCS12 object 3160 """ 3161 passphrase = _text_to_bytes_and_warn("passphrase", passphrase) 3162 3163 if isinstance(buffer, _text_type): 3164 buffer = buffer.encode("ascii") 3165 3166 bio = _new_mem_buf(buffer) 3167 3168 # Use null passphrase if passphrase is None or empty string. With PKCS#12 3169 # password based encryption no password and a zero length password are two 3170 # different things, but OpenSSL implementation will try both to figure out 3171 # which one works. 3172 if not passphrase: 3173 passphrase = _ffi.NULL 3174 3175 p12 = _lib.d2i_PKCS12_bio(bio, _ffi.NULL) 3176 if p12 == _ffi.NULL: 3177 _raise_current_error() 3178 p12 = _ffi.gc(p12, _lib.PKCS12_free) 3179 3180 pkey = _ffi.new("EVP_PKEY**") 3181 cert = _ffi.new("X509**") 3182 cacerts = _ffi.new("Cryptography_STACK_OF_X509**") 3183 3184 parse_result = _lib.PKCS12_parse(p12, passphrase, pkey, cert, cacerts) 3185 if not parse_result: 3186 _raise_current_error() 3187 3188 cacerts = _ffi.gc(cacerts[0], _lib.sk_X509_free) 3189 3190 # openssl 1.0.0 sometimes leaves an X509_check_private_key error in the 3191 # queue for no particular reason. This error isn't interesting to anyone 3192 # outside this function. It's not even interesting to us. Get rid of it. 3193 try: 3194 _raise_current_error() 3195 except Error: 3196 pass 3197 3198 if pkey[0] == _ffi.NULL: 3199 pykey = None 3200 else: 3201 pykey = PKey.__new__(PKey) 3202 pykey._pkey = _ffi.gc(pkey[0], _lib.EVP_PKEY_free) 3203 3204 if cert[0] == _ffi.NULL: 3205 pycert = None 3206 friendlyname = None 3207 else: 3208 pycert = X509._from_raw_x509_ptr(cert[0]) 3209 3210 friendlyname_length = _ffi.new("int*") 3211 friendlyname_buffer = _lib.X509_alias_get0( 3212 cert[0], friendlyname_length 3213 ) 3214 friendlyname = _ffi.buffer( 3215 friendlyname_buffer, friendlyname_length[0] 3216 )[:] 3217 if friendlyname_buffer == _ffi.NULL: 3218 friendlyname = None 3219 3220 pycacerts = [] 3221 for i in range(_lib.sk_X509_num(cacerts)): 3222 x509 = _lib.sk_X509_value(cacerts, i) 3223 pycacert = X509._from_raw_x509_ptr(x509) 3224 pycacerts.append(pycacert) 3225 if not pycacerts: 3226 pycacerts = None 3227 3228 pkcs12 = PKCS12.__new__(PKCS12) 3229 pkcs12._pkey = pykey 3230 pkcs12._cert = pycert 3231 pkcs12._cacerts = pycacerts 3232 pkcs12._friendlyname = friendlyname 3233 return pkcs12 3234 3235 3236load_pkcs12 = utils.deprecated( 3237 load_pkcs12, 3238 __name__, 3239 ( 3240 "PKCS#12 support in pyOpenSSL is deprecated. You should use the APIs " 3241 "in cryptography." 3242 ), 3243 DeprecationWarning, 3244) 3245 3246 3247# There are no direct unit tests for this initialization. It is tested 3248# indirectly since it is necessary for functions like dump_privatekey when 3249# using encryption. 3250# 3251# Thus OpenSSL.test.test_crypto.FunctionTests.test_dump_privatekey_passphrase 3252# and some other similar tests may fail without this (though they may not if 3253# the Python runtime has already done some initialization of the underlying 3254# OpenSSL library (and is linked against the same one that cryptography is 3255# using)). 3256_lib.OpenSSL_add_all_algorithms() 3257 3258# This is similar but exercised mainly by exception_from_error_queue. It calls 3259# both ERR_load_crypto_strings() and ERR_load_SSL_strings(). 3260_lib.SSL_load_error_strings() 3261 3262 3263# Set the default string mask to match OpenSSL upstream (since 2005) and 3264# RFC5280 recommendations. 3265_lib.ASN1_STRING_set_default_mask_asc(b"utf8only") 3266