1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7import datetime 8import operator 9import warnings 10 11from cryptography import utils, x509 12from cryptography.exceptions import UnsupportedAlgorithm 13from cryptography.hazmat.backends.openssl.decode_asn1 import ( 14 _CERTIFICATE_EXTENSION_PARSER, _CERTIFICATE_EXTENSION_PARSER_NO_SCT, 15 _CRL_EXTENSION_PARSER, _CSR_EXTENSION_PARSER, 16 _REVOKED_CERTIFICATE_EXTENSION_PARSER, _asn1_integer_to_int, 17 _asn1_string_to_bytes, _decode_x509_name, _obj2txt, _parse_asn1_time 18) 19from cryptography.hazmat.backends.openssl.encode_asn1 import ( 20 _encode_asn1_int_gc 21) 22from cryptography.hazmat.primitives import hashes, serialization 23from cryptography.hazmat.primitives.asymmetric import dsa, ec, rsa 24 25 26@utils.register_interface(x509.Certificate) 27class _Certificate(object): 28 def __init__(self, backend, x509): 29 self._backend = backend 30 self._x509 = x509 31 32 def __repr__(self): 33 return "<Certificate(subject={0}, ...)>".format(self.subject) 34 35 def __eq__(self, other): 36 if not isinstance(other, x509.Certificate): 37 return NotImplemented 38 39 res = self._backend._lib.X509_cmp(self._x509, other._x509) 40 return res == 0 41 42 def __ne__(self, other): 43 return not self == other 44 45 def __hash__(self): 46 return hash(self.public_bytes(serialization.Encoding.DER)) 47 48 def fingerprint(self, algorithm): 49 h = hashes.Hash(algorithm, self._backend) 50 h.update(self.public_bytes(serialization.Encoding.DER)) 51 return h.finalize() 52 53 @property 54 def version(self): 55 version = self._backend._lib.X509_get_version(self._x509) 56 if version == 0: 57 return x509.Version.v1 58 elif version == 2: 59 return x509.Version.v3 60 else: 61 raise x509.InvalidVersion( 62 "{0} is not a valid X509 version".format(version), version 63 ) 64 65 @property 66 def serial(self): 67 warnings.warn( 68 "Certificate serial is deprecated, use serial_number instead.", 69 utils.PersistentlyDeprecated, 70 stacklevel=2 71 ) 72 return self.serial_number 73 74 @property 75 def serial_number(self): 76 asn1_int = self._backend._lib.X509_get_serialNumber(self._x509) 77 self._backend.openssl_assert(asn1_int != self._backend._ffi.NULL) 78 return _asn1_integer_to_int(self._backend, asn1_int) 79 80 def public_key(self): 81 pkey = self._backend._lib.X509_get_pubkey(self._x509) 82 if pkey == self._backend._ffi.NULL: 83 # Remove errors from the stack. 84 self._backend._consume_errors() 85 raise ValueError("Certificate public key is of an unknown type") 86 87 pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free) 88 89 return self._backend._evp_pkey_to_public_key(pkey) 90 91 @property 92 def not_valid_before(self): 93 asn1_time = self._backend._lib.X509_get_notBefore(self._x509) 94 return _parse_asn1_time(self._backend, asn1_time) 95 96 @property 97 def not_valid_after(self): 98 asn1_time = self._backend._lib.X509_get_notAfter(self._x509) 99 return _parse_asn1_time(self._backend, asn1_time) 100 101 @property 102 def issuer(self): 103 issuer = self._backend._lib.X509_get_issuer_name(self._x509) 104 self._backend.openssl_assert(issuer != self._backend._ffi.NULL) 105 return _decode_x509_name(self._backend, issuer) 106 107 @property 108 def subject(self): 109 subject = self._backend._lib.X509_get_subject_name(self._x509) 110 self._backend.openssl_assert(subject != self._backend._ffi.NULL) 111 return _decode_x509_name(self._backend, subject) 112 113 @property 114 def signature_hash_algorithm(self): 115 oid = self.signature_algorithm_oid 116 try: 117 return x509._SIG_OIDS_TO_HASH[oid] 118 except KeyError: 119 raise UnsupportedAlgorithm( 120 "Signature algorithm OID:{0} not recognized".format(oid) 121 ) 122 123 @property 124 def signature_algorithm_oid(self): 125 alg = self._backend._ffi.new("X509_ALGOR **") 126 self._backend._lib.X509_get0_signature( 127 self._backend._ffi.NULL, alg, self._x509 128 ) 129 self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL) 130 oid = _obj2txt(self._backend, alg[0].algorithm) 131 return x509.ObjectIdentifier(oid) 132 133 @utils.cached_property 134 def extensions(self): 135 if self._backend._lib.CRYPTOGRAPHY_OPENSSL_110_OR_GREATER: 136 return _CERTIFICATE_EXTENSION_PARSER.parse( 137 self._backend, self._x509 138 ) 139 else: 140 return _CERTIFICATE_EXTENSION_PARSER_NO_SCT.parse( 141 self._backend, self._x509 142 ) 143 144 @property 145 def signature(self): 146 sig = self._backend._ffi.new("ASN1_BIT_STRING **") 147 self._backend._lib.X509_get0_signature( 148 sig, self._backend._ffi.NULL, self._x509 149 ) 150 self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL) 151 return _asn1_string_to_bytes(self._backend, sig[0]) 152 153 @property 154 def tbs_certificate_bytes(self): 155 pp = self._backend._ffi.new("unsigned char **") 156 res = self._backend._lib.i2d_re_X509_tbs(self._x509, pp) 157 self._backend.openssl_assert(res > 0) 158 pp = self._backend._ffi.gc( 159 pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0]) 160 ) 161 return self._backend._ffi.buffer(pp[0], res)[:] 162 163 def public_bytes(self, encoding): 164 bio = self._backend._create_mem_bio_gc() 165 if encoding is serialization.Encoding.PEM: 166 res = self._backend._lib.PEM_write_bio_X509(bio, self._x509) 167 elif encoding is serialization.Encoding.DER: 168 res = self._backend._lib.i2d_X509_bio(bio, self._x509) 169 else: 170 raise TypeError("encoding must be an item from the Encoding enum") 171 172 self._backend.openssl_assert(res == 1) 173 return self._backend._read_mem_bio(bio) 174 175 176@utils.register_interface(x509.RevokedCertificate) 177class _RevokedCertificate(object): 178 def __init__(self, backend, crl, x509_revoked): 179 self._backend = backend 180 # The X509_REVOKED_value is a X509_REVOKED * that has 181 # no reference counting. This means when X509_CRL_free is 182 # called then the CRL and all X509_REVOKED * are freed. Since 183 # you can retain a reference to a single revoked certificate 184 # and let the CRL fall out of scope we need to retain a 185 # private reference to the CRL inside the RevokedCertificate 186 # object to prevent the gc from being called inappropriately. 187 self._crl = crl 188 self._x509_revoked = x509_revoked 189 190 @property 191 def serial_number(self): 192 asn1_int = self._backend._lib.X509_REVOKED_get0_serialNumber( 193 self._x509_revoked 194 ) 195 self._backend.openssl_assert(asn1_int != self._backend._ffi.NULL) 196 return _asn1_integer_to_int(self._backend, asn1_int) 197 198 @property 199 def revocation_date(self): 200 return _parse_asn1_time( 201 self._backend, 202 self._backend._lib.X509_REVOKED_get0_revocationDate( 203 self._x509_revoked 204 ) 205 ) 206 207 @utils.cached_property 208 def extensions(self): 209 return _REVOKED_CERTIFICATE_EXTENSION_PARSER.parse( 210 self._backend, self._x509_revoked 211 ) 212 213 214@utils.register_interface(x509.CertificateRevocationList) 215class _CertificateRevocationList(object): 216 def __init__(self, backend, x509_crl): 217 self._backend = backend 218 self._x509_crl = x509_crl 219 220 def __eq__(self, other): 221 if not isinstance(other, x509.CertificateRevocationList): 222 return NotImplemented 223 224 res = self._backend._lib.X509_CRL_cmp(self._x509_crl, other._x509_crl) 225 return res == 0 226 227 def __ne__(self, other): 228 return not self == other 229 230 def fingerprint(self, algorithm): 231 h = hashes.Hash(algorithm, self._backend) 232 bio = self._backend._create_mem_bio_gc() 233 res = self._backend._lib.i2d_X509_CRL_bio( 234 bio, self._x509_crl 235 ) 236 self._backend.openssl_assert(res == 1) 237 der = self._backend._read_mem_bio(bio) 238 h.update(der) 239 return h.finalize() 240 241 @utils.cached_property 242 def _sorted_crl(self): 243 # X509_CRL_get0_by_serial sorts in place, which breaks a variety of 244 # things we don't want to break (like iteration and the signature). 245 # Let's dupe it and sort that instead. 246 dup = self._backend._lib.X509_CRL_dup(self._x509_crl) 247 self._backend.openssl_assert(dup != self._backend._ffi.NULL) 248 dup = self._backend._ffi.gc(dup, self._backend._lib.X509_CRL_free) 249 return dup 250 251 def get_revoked_certificate_by_serial_number(self, serial_number): 252 revoked = self._backend._ffi.new("X509_REVOKED **") 253 asn1_int = _encode_asn1_int_gc(self._backend, serial_number) 254 res = self._backend._lib.X509_CRL_get0_by_serial( 255 self._sorted_crl, revoked, asn1_int 256 ) 257 if res == 0: 258 return None 259 else: 260 self._backend.openssl_assert( 261 revoked[0] != self._backend._ffi.NULL 262 ) 263 return _RevokedCertificate( 264 self._backend, self._sorted_crl, revoked[0] 265 ) 266 267 @property 268 def signature_hash_algorithm(self): 269 oid = self.signature_algorithm_oid 270 try: 271 return x509._SIG_OIDS_TO_HASH[oid] 272 except KeyError: 273 raise UnsupportedAlgorithm( 274 "Signature algorithm OID:{0} not recognized".format(oid) 275 ) 276 277 @property 278 def signature_algorithm_oid(self): 279 alg = self._backend._ffi.new("X509_ALGOR **") 280 self._backend._lib.X509_CRL_get0_signature( 281 self._x509_crl, self._backend._ffi.NULL, alg 282 ) 283 self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL) 284 oid = _obj2txt(self._backend, alg[0].algorithm) 285 return x509.ObjectIdentifier(oid) 286 287 @property 288 def issuer(self): 289 issuer = self._backend._lib.X509_CRL_get_issuer(self._x509_crl) 290 self._backend.openssl_assert(issuer != self._backend._ffi.NULL) 291 return _decode_x509_name(self._backend, issuer) 292 293 @property 294 def next_update(self): 295 nu = self._backend._lib.X509_CRL_get_nextUpdate(self._x509_crl) 296 self._backend.openssl_assert(nu != self._backend._ffi.NULL) 297 return _parse_asn1_time(self._backend, nu) 298 299 @property 300 def last_update(self): 301 lu = self._backend._lib.X509_CRL_get_lastUpdate(self._x509_crl) 302 self._backend.openssl_assert(lu != self._backend._ffi.NULL) 303 return _parse_asn1_time(self._backend, lu) 304 305 @property 306 def signature(self): 307 sig = self._backend._ffi.new("ASN1_BIT_STRING **") 308 self._backend._lib.X509_CRL_get0_signature( 309 self._x509_crl, sig, self._backend._ffi.NULL 310 ) 311 self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL) 312 return _asn1_string_to_bytes(self._backend, sig[0]) 313 314 @property 315 def tbs_certlist_bytes(self): 316 pp = self._backend._ffi.new("unsigned char **") 317 res = self._backend._lib.i2d_re_X509_CRL_tbs(self._x509_crl, pp) 318 self._backend.openssl_assert(res > 0) 319 pp = self._backend._ffi.gc( 320 pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0]) 321 ) 322 return self._backend._ffi.buffer(pp[0], res)[:] 323 324 def public_bytes(self, encoding): 325 bio = self._backend._create_mem_bio_gc() 326 if encoding is serialization.Encoding.PEM: 327 res = self._backend._lib.PEM_write_bio_X509_CRL( 328 bio, self._x509_crl 329 ) 330 elif encoding is serialization.Encoding.DER: 331 res = self._backend._lib.i2d_X509_CRL_bio(bio, self._x509_crl) 332 else: 333 raise TypeError("encoding must be an item from the Encoding enum") 334 335 self._backend.openssl_assert(res == 1) 336 return self._backend._read_mem_bio(bio) 337 338 def _revoked_cert(self, idx): 339 revoked = self._backend._lib.X509_CRL_get_REVOKED(self._x509_crl) 340 r = self._backend._lib.sk_X509_REVOKED_value(revoked, idx) 341 self._backend.openssl_assert(r != self._backend._ffi.NULL) 342 return _RevokedCertificate(self._backend, self, r) 343 344 def __iter__(self): 345 for i in range(len(self)): 346 yield self._revoked_cert(i) 347 348 def __getitem__(self, idx): 349 if isinstance(idx, slice): 350 start, stop, step = idx.indices(len(self)) 351 return [self._revoked_cert(i) for i in range(start, stop, step)] 352 else: 353 idx = operator.index(idx) 354 if idx < 0: 355 idx += len(self) 356 if not 0 <= idx < len(self): 357 raise IndexError 358 return self._revoked_cert(idx) 359 360 def __len__(self): 361 revoked = self._backend._lib.X509_CRL_get_REVOKED(self._x509_crl) 362 if revoked == self._backend._ffi.NULL: 363 return 0 364 else: 365 return self._backend._lib.sk_X509_REVOKED_num(revoked) 366 367 @utils.cached_property 368 def extensions(self): 369 return _CRL_EXTENSION_PARSER.parse(self._backend, self._x509_crl) 370 371 def is_signature_valid(self, public_key): 372 if not isinstance(public_key, (dsa.DSAPublicKey, rsa.RSAPublicKey, 373 ec.EllipticCurvePublicKey)): 374 raise TypeError('Expecting one of DSAPublicKey, RSAPublicKey,' 375 ' or EllipticCurvePublicKey.') 376 res = self._backend._lib.X509_CRL_verify( 377 self._x509_crl, public_key._evp_pkey 378 ) 379 380 if res != 1: 381 self._backend._consume_errors() 382 return False 383 384 return True 385 386 387@utils.register_interface(x509.CertificateSigningRequest) 388class _CertificateSigningRequest(object): 389 def __init__(self, backend, x509_req): 390 self._backend = backend 391 self._x509_req = x509_req 392 393 def __eq__(self, other): 394 if not isinstance(other, _CertificateSigningRequest): 395 return NotImplemented 396 397 self_bytes = self.public_bytes(serialization.Encoding.DER) 398 other_bytes = other.public_bytes(serialization.Encoding.DER) 399 return self_bytes == other_bytes 400 401 def __ne__(self, other): 402 return not self == other 403 404 def __hash__(self): 405 return hash(self.public_bytes(serialization.Encoding.DER)) 406 407 def public_key(self): 408 pkey = self._backend._lib.X509_REQ_get_pubkey(self._x509_req) 409 self._backend.openssl_assert(pkey != self._backend._ffi.NULL) 410 pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free) 411 return self._backend._evp_pkey_to_public_key(pkey) 412 413 @property 414 def subject(self): 415 subject = self._backend._lib.X509_REQ_get_subject_name(self._x509_req) 416 self._backend.openssl_assert(subject != self._backend._ffi.NULL) 417 return _decode_x509_name(self._backend, subject) 418 419 @property 420 def signature_hash_algorithm(self): 421 oid = self.signature_algorithm_oid 422 try: 423 return x509._SIG_OIDS_TO_HASH[oid] 424 except KeyError: 425 raise UnsupportedAlgorithm( 426 "Signature algorithm OID:{0} not recognized".format(oid) 427 ) 428 429 @property 430 def signature_algorithm_oid(self): 431 alg = self._backend._ffi.new("X509_ALGOR **") 432 self._backend._lib.X509_REQ_get0_signature( 433 self._x509_req, self._backend._ffi.NULL, alg 434 ) 435 self._backend.openssl_assert(alg[0] != self._backend._ffi.NULL) 436 oid = _obj2txt(self._backend, alg[0].algorithm) 437 return x509.ObjectIdentifier(oid) 438 439 @utils.cached_property 440 def extensions(self): 441 x509_exts = self._backend._lib.X509_REQ_get_extensions(self._x509_req) 442 x509_exts = self._backend._ffi.gc( 443 x509_exts, 444 lambda x: self._backend._lib.sk_X509_EXTENSION_pop_free( 445 x, self._backend._ffi.addressof( 446 self._backend._lib._original_lib, "X509_EXTENSION_free" 447 ) 448 ) 449 ) 450 return _CSR_EXTENSION_PARSER.parse(self._backend, x509_exts) 451 452 def public_bytes(self, encoding): 453 bio = self._backend._create_mem_bio_gc() 454 if encoding is serialization.Encoding.PEM: 455 res = self._backend._lib.PEM_write_bio_X509_REQ( 456 bio, self._x509_req 457 ) 458 elif encoding is serialization.Encoding.DER: 459 res = self._backend._lib.i2d_X509_REQ_bio(bio, self._x509_req) 460 else: 461 raise TypeError("encoding must be an item from the Encoding enum") 462 463 self._backend.openssl_assert(res == 1) 464 return self._backend._read_mem_bio(bio) 465 466 @property 467 def tbs_certrequest_bytes(self): 468 pp = self._backend._ffi.new("unsigned char **") 469 res = self._backend._lib.i2d_re_X509_REQ_tbs(self._x509_req, pp) 470 self._backend.openssl_assert(res > 0) 471 pp = self._backend._ffi.gc( 472 pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0]) 473 ) 474 return self._backend._ffi.buffer(pp[0], res)[:] 475 476 @property 477 def signature(self): 478 sig = self._backend._ffi.new("ASN1_BIT_STRING **") 479 self._backend._lib.X509_REQ_get0_signature( 480 self._x509_req, sig, self._backend._ffi.NULL 481 ) 482 self._backend.openssl_assert(sig[0] != self._backend._ffi.NULL) 483 return _asn1_string_to_bytes(self._backend, sig[0]) 484 485 @property 486 def is_signature_valid(self): 487 pkey = self._backend._lib.X509_REQ_get_pubkey(self._x509_req) 488 self._backend.openssl_assert(pkey != self._backend._ffi.NULL) 489 pkey = self._backend._ffi.gc(pkey, self._backend._lib.EVP_PKEY_free) 490 res = self._backend._lib.X509_REQ_verify(self._x509_req, pkey) 491 492 if res != 1: 493 self._backend._consume_errors() 494 return False 495 496 return True 497 498 499@utils.register_interface( 500 x509.certificate_transparency.SignedCertificateTimestamp 501) 502class _SignedCertificateTimestamp(object): 503 def __init__(self, backend, sct_list, sct): 504 self._backend = backend 505 # Keep the SCT_LIST that this SCT came from alive. 506 self._sct_list = sct_list 507 self._sct = sct 508 509 @property 510 def version(self): 511 version = self._backend._lib.SCT_get_version(self._sct) 512 assert version == self._backend._lib.SCT_VERSION_V1 513 return x509.certificate_transparency.Version.v1 514 515 @property 516 def log_id(self): 517 out = self._backend._ffi.new("unsigned char **") 518 log_id_length = self._backend._lib.SCT_get0_log_id(self._sct, out) 519 assert log_id_length >= 0 520 return self._backend._ffi.buffer(out[0], log_id_length)[:] 521 522 @property 523 def timestamp(self): 524 timestamp = self._backend._lib.SCT_get_timestamp(self._sct) 525 milliseconds = timestamp % 1000 526 return datetime.datetime.utcfromtimestamp( 527 timestamp // 1000 528 ).replace(microsecond=milliseconds * 1000) 529 530 @property 531 def entry_type(self): 532 entry_type = self._backend._lib.SCT_get_log_entry_type(self._sct) 533 # We currently only support loading SCTs from the X.509 extension, so 534 # we only have precerts. 535 assert entry_type == self._backend._lib.CT_LOG_ENTRY_TYPE_PRECERT 536 return x509.certificate_transparency.LogEntryType.PRE_CERTIFICATE 537 538 @property 539 def _signature(self): 540 ptrptr = self._backend._ffi.new("unsigned char **") 541 res = self._backend._lib.SCT_get0_signature(self._sct, ptrptr) 542 self._backend.openssl_assert(res > 0) 543 self._backend.openssl_assert(ptrptr[0] != self._backend._ffi.NULL) 544 return self._backend._ffi.buffer(ptrptr[0], res)[:] 545 546 def __hash__(self): 547 return hash(self._signature) 548 549 def __eq__(self, other): 550 if not isinstance(other, _SignedCertificateTimestamp): 551 return NotImplemented 552 553 return self._signature == other._signature 554 555 def __ne__(self, other): 556 return not self == other 557