1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7import datetime 8import ipaddress 9 10import six 11 12from cryptography import x509 13from cryptography.hazmat._der import DERReader, INTEGER, NULL, SEQUENCE 14from cryptography.x509.extensions import _TLS_FEATURE_TYPE_TO_ENUM 15from cryptography.x509.name import _ASN1_TYPE_TO_ENUM 16from cryptography.x509.oid import ( 17 CRLEntryExtensionOID, 18 CertificatePoliciesOID, 19 ExtensionOID, 20 OCSPExtensionOID, 21) 22 23 24def _obj2txt(backend, obj): 25 # Set to 80 on the recommendation of 26 # https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values 27 # 28 # But OIDs longer than this occur in real life (e.g. Active 29 # Directory makes some very long OIDs). So we need to detect 30 # and properly handle the case where the default buffer is not 31 # big enough. 32 # 33 buf_len = 80 34 buf = backend._ffi.new("char[]", buf_len) 35 36 # 'res' is the number of bytes that *would* be written if the 37 # buffer is large enough. If 'res' > buf_len - 1, we need to 38 # alloc a big-enough buffer and go again. 39 res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1) 40 if res > buf_len - 1: # account for terminating null byte 41 buf_len = res + 1 42 buf = backend._ffi.new("char[]", buf_len) 43 res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1) 44 backend.openssl_assert(res > 0) 45 return backend._ffi.buffer(buf, res)[:].decode() 46 47 48def _decode_x509_name_entry(backend, x509_name_entry): 49 obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry) 50 backend.openssl_assert(obj != backend._ffi.NULL) 51 data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry) 52 backend.openssl_assert(data != backend._ffi.NULL) 53 value = _asn1_string_to_utf8(backend, data) 54 oid = _obj2txt(backend, obj) 55 type = _ASN1_TYPE_TO_ENUM[data.type] 56 57 return x509.NameAttribute(x509.ObjectIdentifier(oid), value, type) 58 59 60def _decode_x509_name(backend, x509_name): 61 count = backend._lib.X509_NAME_entry_count(x509_name) 62 attributes = [] 63 prev_set_id = -1 64 for x in range(count): 65 entry = backend._lib.X509_NAME_get_entry(x509_name, x) 66 attribute = _decode_x509_name_entry(backend, entry) 67 set_id = backend._lib.X509_NAME_ENTRY_set(entry) 68 if set_id != prev_set_id: 69 attributes.append({attribute}) 70 else: 71 # is in the same RDN a previous entry 72 attributes[-1].add(attribute) 73 prev_set_id = set_id 74 75 return x509.Name(x509.RelativeDistinguishedName(rdn) for rdn in attributes) 76 77 78def _decode_general_names(backend, gns): 79 num = backend._lib.sk_GENERAL_NAME_num(gns) 80 names = [] 81 for i in range(num): 82 gn = backend._lib.sk_GENERAL_NAME_value(gns, i) 83 backend.openssl_assert(gn != backend._ffi.NULL) 84 names.append(_decode_general_name(backend, gn)) 85 86 return names 87 88 89def _decode_general_name(backend, gn): 90 if gn.type == backend._lib.GEN_DNS: 91 # Convert to bytes and then decode to utf8. We don't use 92 # asn1_string_to_utf8 here because it doesn't properly convert 93 # utf8 from ia5strings. 94 data = _asn1_string_to_bytes(backend, gn.d.dNSName).decode("utf8") 95 # We don't use the constructor for DNSName so we can bypass validation 96 # This allows us to create DNSName objects that have unicode chars 97 # when a certificate (against the RFC) contains them. 98 return x509.DNSName._init_without_validation(data) 99 elif gn.type == backend._lib.GEN_URI: 100 # Convert to bytes and then decode to utf8. We don't use 101 # asn1_string_to_utf8 here because it doesn't properly convert 102 # utf8 from ia5strings. 103 data = _asn1_string_to_bytes( 104 backend, gn.d.uniformResourceIdentifier 105 ).decode("utf8") 106 # We don't use the constructor for URI so we can bypass validation 107 # This allows us to create URI objects that have unicode chars 108 # when a certificate (against the RFC) contains them. 109 return x509.UniformResourceIdentifier._init_without_validation(data) 110 elif gn.type == backend._lib.GEN_RID: 111 oid = _obj2txt(backend, gn.d.registeredID) 112 return x509.RegisteredID(x509.ObjectIdentifier(oid)) 113 elif gn.type == backend._lib.GEN_IPADD: 114 data = _asn1_string_to_bytes(backend, gn.d.iPAddress) 115 data_len = len(data) 116 if data_len == 8 or data_len == 32: 117 # This is an IPv4 or IPv6 Network and not a single IP. This 118 # type of data appears in Name Constraints. Unfortunately, 119 # ipaddress doesn't support packed bytes + netmask. Additionally, 120 # IPv6Network can only handle CIDR rather than the full 16 byte 121 # netmask. To handle this we convert the netmask to integer, then 122 # find the first 0 bit, which will be the prefix. If another 1 123 # bit is present after that the netmask is invalid. 124 base = ipaddress.ip_address(data[: data_len // 2]) 125 netmask = ipaddress.ip_address(data[data_len // 2 :]) 126 bits = bin(int(netmask))[2:] 127 prefix = bits.find("0") 128 # If no 0 bits are found it is a /32 or /128 129 if prefix == -1: 130 prefix = len(bits) 131 132 if "1" in bits[prefix:]: 133 raise ValueError("Invalid netmask") 134 135 ip = ipaddress.ip_network(base.exploded + u"/{}".format(prefix)) 136 else: 137 ip = ipaddress.ip_address(data) 138 139 return x509.IPAddress(ip) 140 elif gn.type == backend._lib.GEN_DIRNAME: 141 return x509.DirectoryName( 142 _decode_x509_name(backend, gn.d.directoryName) 143 ) 144 elif gn.type == backend._lib.GEN_EMAIL: 145 # Convert to bytes and then decode to utf8. We don't use 146 # asn1_string_to_utf8 here because it doesn't properly convert 147 # utf8 from ia5strings. 148 data = _asn1_string_to_bytes(backend, gn.d.rfc822Name).decode("utf8") 149 # We don't use the constructor for RFC822Name so we can bypass 150 # validation. This allows us to create RFC822Name objects that have 151 # unicode chars when a certificate (against the RFC) contains them. 152 return x509.RFC822Name._init_without_validation(data) 153 elif gn.type == backend._lib.GEN_OTHERNAME: 154 type_id = _obj2txt(backend, gn.d.otherName.type_id) 155 value = _asn1_to_der(backend, gn.d.otherName.value) 156 return x509.OtherName(x509.ObjectIdentifier(type_id), value) 157 else: 158 # x400Address or ediPartyName 159 raise x509.UnsupportedGeneralNameType( 160 "{} is not a supported type".format( 161 x509._GENERAL_NAMES.get(gn.type, gn.type) 162 ), 163 gn.type, 164 ) 165 166 167def _decode_ocsp_no_check(backend, ext): 168 return x509.OCSPNoCheck() 169 170 171def _decode_crl_number(backend, ext): 172 asn1_int = backend._ffi.cast("ASN1_INTEGER *", ext) 173 asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free) 174 return x509.CRLNumber(_asn1_integer_to_int(backend, asn1_int)) 175 176 177def _decode_delta_crl_indicator(backend, ext): 178 asn1_int = backend._ffi.cast("ASN1_INTEGER *", ext) 179 asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free) 180 return x509.DeltaCRLIndicator(_asn1_integer_to_int(backend, asn1_int)) 181 182 183class _X509ExtensionParser(object): 184 def __init__(self, backend, ext_count, get_ext, handlers): 185 self.ext_count = ext_count 186 self.get_ext = get_ext 187 self.handlers = handlers 188 self._backend = backend 189 190 def parse(self, x509_obj): 191 extensions = [] 192 seen_oids = set() 193 for i in range(self.ext_count(x509_obj)): 194 ext = self.get_ext(x509_obj, i) 195 self._backend.openssl_assert(ext != self._backend._ffi.NULL) 196 crit = self._backend._lib.X509_EXTENSION_get_critical(ext) 197 critical = crit == 1 198 oid = x509.ObjectIdentifier( 199 _obj2txt( 200 self._backend, 201 self._backend._lib.X509_EXTENSION_get_object(ext), 202 ) 203 ) 204 if oid in seen_oids: 205 raise x509.DuplicateExtension( 206 "Duplicate {} extension found".format(oid), oid 207 ) 208 209 # These OIDs are only supported in OpenSSL 1.1.0+ but we want 210 # to support them in all versions of OpenSSL so we decode them 211 # ourselves. 212 if oid == ExtensionOID.TLS_FEATURE: 213 # The extension contents are a SEQUENCE OF INTEGERs. 214 data = self._backend._lib.X509_EXTENSION_get_data(ext) 215 data_bytes = _asn1_string_to_bytes(self._backend, data) 216 features = DERReader(data_bytes).read_single_element(SEQUENCE) 217 parsed = [] 218 while not features.is_empty(): 219 parsed.append(features.read_element(INTEGER).as_integer()) 220 # Map the features to their enum value. 221 value = x509.TLSFeature( 222 [_TLS_FEATURE_TYPE_TO_ENUM[x] for x in parsed] 223 ) 224 extensions.append(x509.Extension(oid, critical, value)) 225 seen_oids.add(oid) 226 continue 227 elif oid == ExtensionOID.PRECERT_POISON: 228 data = self._backend._lib.X509_EXTENSION_get_data(ext) 229 # The contents of the extension must be an ASN.1 NULL. 230 reader = DERReader(_asn1_string_to_bytes(self._backend, data)) 231 reader.read_single_element(NULL).check_empty() 232 extensions.append( 233 x509.Extension(oid, critical, x509.PrecertPoison()) 234 ) 235 seen_oids.add(oid) 236 continue 237 238 try: 239 handler = self.handlers[oid] 240 except KeyError: 241 # Dump the DER payload into an UnrecognizedExtension object 242 data = self._backend._lib.X509_EXTENSION_get_data(ext) 243 self._backend.openssl_assert(data != self._backend._ffi.NULL) 244 der = self._backend._ffi.buffer(data.data, data.length)[:] 245 unrecognized = x509.UnrecognizedExtension(oid, der) 246 extensions.append(x509.Extension(oid, critical, unrecognized)) 247 else: 248 ext_data = self._backend._lib.X509V3_EXT_d2i(ext) 249 if ext_data == self._backend._ffi.NULL: 250 self._backend._consume_errors() 251 raise ValueError( 252 "The {} extension is invalid and can't be " 253 "parsed".format(oid) 254 ) 255 256 value = handler(self._backend, ext_data) 257 extensions.append(x509.Extension(oid, critical, value)) 258 259 seen_oids.add(oid) 260 261 return x509.Extensions(extensions) 262 263 264def _decode_certificate_policies(backend, cp): 265 cp = backend._ffi.cast("Cryptography_STACK_OF_POLICYINFO *", cp) 266 cp = backend._ffi.gc(cp, backend._lib.CERTIFICATEPOLICIES_free) 267 268 num = backend._lib.sk_POLICYINFO_num(cp) 269 certificate_policies = [] 270 for i in range(num): 271 qualifiers = None 272 pi = backend._lib.sk_POLICYINFO_value(cp, i) 273 oid = x509.ObjectIdentifier(_obj2txt(backend, pi.policyid)) 274 if pi.qualifiers != backend._ffi.NULL: 275 qnum = backend._lib.sk_POLICYQUALINFO_num(pi.qualifiers) 276 qualifiers = [] 277 for j in range(qnum): 278 pqi = backend._lib.sk_POLICYQUALINFO_value(pi.qualifiers, j) 279 pqualid = x509.ObjectIdentifier(_obj2txt(backend, pqi.pqualid)) 280 if pqualid == CertificatePoliciesOID.CPS_QUALIFIER: 281 cpsuri = backend._ffi.buffer( 282 pqi.d.cpsuri.data, pqi.d.cpsuri.length 283 )[:].decode("ascii") 284 qualifiers.append(cpsuri) 285 else: 286 assert pqualid == CertificatePoliciesOID.CPS_USER_NOTICE 287 user_notice = _decode_user_notice( 288 backend, pqi.d.usernotice 289 ) 290 qualifiers.append(user_notice) 291 292 certificate_policies.append(x509.PolicyInformation(oid, qualifiers)) 293 294 return x509.CertificatePolicies(certificate_policies) 295 296 297def _decode_user_notice(backend, un): 298 explicit_text = None 299 notice_reference = None 300 301 if un.exptext != backend._ffi.NULL: 302 explicit_text = _asn1_string_to_utf8(backend, un.exptext) 303 304 if un.noticeref != backend._ffi.NULL: 305 organization = _asn1_string_to_utf8(backend, un.noticeref.organization) 306 307 num = backend._lib.sk_ASN1_INTEGER_num(un.noticeref.noticenos) 308 notice_numbers = [] 309 for i in range(num): 310 asn1_int = backend._lib.sk_ASN1_INTEGER_value( 311 un.noticeref.noticenos, i 312 ) 313 notice_num = _asn1_integer_to_int(backend, asn1_int) 314 notice_numbers.append(notice_num) 315 316 notice_reference = x509.NoticeReference(organization, notice_numbers) 317 318 return x509.UserNotice(notice_reference, explicit_text) 319 320 321def _decode_basic_constraints(backend, bc_st): 322 basic_constraints = backend._ffi.cast("BASIC_CONSTRAINTS *", bc_st) 323 basic_constraints = backend._ffi.gc( 324 basic_constraints, backend._lib.BASIC_CONSTRAINTS_free 325 ) 326 # The byte representation of an ASN.1 boolean true is \xff. OpenSSL 327 # chooses to just map this to its ordinal value, so true is 255 and 328 # false is 0. 329 ca = basic_constraints.ca == 255 330 path_length = _asn1_integer_to_int_or_none( 331 backend, basic_constraints.pathlen 332 ) 333 334 return x509.BasicConstraints(ca, path_length) 335 336 337def _decode_subject_key_identifier(backend, asn1_string): 338 asn1_string = backend._ffi.cast("ASN1_OCTET_STRING *", asn1_string) 339 asn1_string = backend._ffi.gc( 340 asn1_string, backend._lib.ASN1_OCTET_STRING_free 341 ) 342 return x509.SubjectKeyIdentifier( 343 backend._ffi.buffer(asn1_string.data, asn1_string.length)[:] 344 ) 345 346 347def _decode_authority_key_identifier(backend, akid): 348 akid = backend._ffi.cast("AUTHORITY_KEYID *", akid) 349 akid = backend._ffi.gc(akid, backend._lib.AUTHORITY_KEYID_free) 350 key_identifier = None 351 authority_cert_issuer = None 352 353 if akid.keyid != backend._ffi.NULL: 354 key_identifier = backend._ffi.buffer( 355 akid.keyid.data, akid.keyid.length 356 )[:] 357 358 if akid.issuer != backend._ffi.NULL: 359 authority_cert_issuer = _decode_general_names(backend, akid.issuer) 360 361 authority_cert_serial_number = _asn1_integer_to_int_or_none( 362 backend, akid.serial 363 ) 364 365 return x509.AuthorityKeyIdentifier( 366 key_identifier, authority_cert_issuer, authority_cert_serial_number 367 ) 368 369 370def _decode_information_access(backend, ia): 371 ia = backend._ffi.cast("Cryptography_STACK_OF_ACCESS_DESCRIPTION *", ia) 372 ia = backend._ffi.gc( 373 ia, 374 lambda x: backend._lib.sk_ACCESS_DESCRIPTION_pop_free( 375 x, 376 backend._ffi.addressof( 377 backend._lib._original_lib, "ACCESS_DESCRIPTION_free" 378 ), 379 ), 380 ) 381 num = backend._lib.sk_ACCESS_DESCRIPTION_num(ia) 382 access_descriptions = [] 383 for i in range(num): 384 ad = backend._lib.sk_ACCESS_DESCRIPTION_value(ia, i) 385 backend.openssl_assert(ad.method != backend._ffi.NULL) 386 oid = x509.ObjectIdentifier(_obj2txt(backend, ad.method)) 387 backend.openssl_assert(ad.location != backend._ffi.NULL) 388 gn = _decode_general_name(backend, ad.location) 389 access_descriptions.append(x509.AccessDescription(oid, gn)) 390 391 return access_descriptions 392 393 394def _decode_authority_information_access(backend, aia): 395 access_descriptions = _decode_information_access(backend, aia) 396 return x509.AuthorityInformationAccess(access_descriptions) 397 398 399def _decode_subject_information_access(backend, aia): 400 access_descriptions = _decode_information_access(backend, aia) 401 return x509.SubjectInformationAccess(access_descriptions) 402 403 404def _decode_key_usage(backend, bit_string): 405 bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string) 406 bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free) 407 get_bit = backend._lib.ASN1_BIT_STRING_get_bit 408 digital_signature = get_bit(bit_string, 0) == 1 409 content_commitment = get_bit(bit_string, 1) == 1 410 key_encipherment = get_bit(bit_string, 2) == 1 411 data_encipherment = get_bit(bit_string, 3) == 1 412 key_agreement = get_bit(bit_string, 4) == 1 413 key_cert_sign = get_bit(bit_string, 5) == 1 414 crl_sign = get_bit(bit_string, 6) == 1 415 encipher_only = get_bit(bit_string, 7) == 1 416 decipher_only = get_bit(bit_string, 8) == 1 417 return x509.KeyUsage( 418 digital_signature, 419 content_commitment, 420 key_encipherment, 421 data_encipherment, 422 key_agreement, 423 key_cert_sign, 424 crl_sign, 425 encipher_only, 426 decipher_only, 427 ) 428 429 430def _decode_general_names_extension(backend, gns): 431 gns = backend._ffi.cast("GENERAL_NAMES *", gns) 432 gns = backend._ffi.gc(gns, backend._lib.GENERAL_NAMES_free) 433 general_names = _decode_general_names(backend, gns) 434 return general_names 435 436 437def _decode_subject_alt_name(backend, ext): 438 return x509.SubjectAlternativeName( 439 _decode_general_names_extension(backend, ext) 440 ) 441 442 443def _decode_issuer_alt_name(backend, ext): 444 return x509.IssuerAlternativeName( 445 _decode_general_names_extension(backend, ext) 446 ) 447 448 449def _decode_name_constraints(backend, nc): 450 nc = backend._ffi.cast("NAME_CONSTRAINTS *", nc) 451 nc = backend._ffi.gc(nc, backend._lib.NAME_CONSTRAINTS_free) 452 permitted = _decode_general_subtrees(backend, nc.permittedSubtrees) 453 excluded = _decode_general_subtrees(backend, nc.excludedSubtrees) 454 return x509.NameConstraints( 455 permitted_subtrees=permitted, excluded_subtrees=excluded 456 ) 457 458 459def _decode_general_subtrees(backend, stack_subtrees): 460 if stack_subtrees == backend._ffi.NULL: 461 return None 462 463 num = backend._lib.sk_GENERAL_SUBTREE_num(stack_subtrees) 464 subtrees = [] 465 466 for i in range(num): 467 obj = backend._lib.sk_GENERAL_SUBTREE_value(stack_subtrees, i) 468 backend.openssl_assert(obj != backend._ffi.NULL) 469 name = _decode_general_name(backend, obj.base) 470 subtrees.append(name) 471 472 return subtrees 473 474 475def _decode_issuing_dist_point(backend, idp): 476 idp = backend._ffi.cast("ISSUING_DIST_POINT *", idp) 477 idp = backend._ffi.gc(idp, backend._lib.ISSUING_DIST_POINT_free) 478 if idp.distpoint != backend._ffi.NULL: 479 full_name, relative_name = _decode_distpoint(backend, idp.distpoint) 480 else: 481 full_name = None 482 relative_name = None 483 484 only_user = idp.onlyuser == 255 485 only_ca = idp.onlyCA == 255 486 indirect_crl = idp.indirectCRL == 255 487 only_attr = idp.onlyattr == 255 488 if idp.onlysomereasons != backend._ffi.NULL: 489 only_some_reasons = _decode_reasons(backend, idp.onlysomereasons) 490 else: 491 only_some_reasons = None 492 493 return x509.IssuingDistributionPoint( 494 full_name, 495 relative_name, 496 only_user, 497 only_ca, 498 only_some_reasons, 499 indirect_crl, 500 only_attr, 501 ) 502 503 504def _decode_policy_constraints(backend, pc): 505 pc = backend._ffi.cast("POLICY_CONSTRAINTS *", pc) 506 pc = backend._ffi.gc(pc, backend._lib.POLICY_CONSTRAINTS_free) 507 508 require_explicit_policy = _asn1_integer_to_int_or_none( 509 backend, pc.requireExplicitPolicy 510 ) 511 inhibit_policy_mapping = _asn1_integer_to_int_or_none( 512 backend, pc.inhibitPolicyMapping 513 ) 514 515 return x509.PolicyConstraints( 516 require_explicit_policy, inhibit_policy_mapping 517 ) 518 519 520def _decode_extended_key_usage(backend, sk): 521 sk = backend._ffi.cast("Cryptography_STACK_OF_ASN1_OBJECT *", sk) 522 sk = backend._ffi.gc(sk, backend._lib.sk_ASN1_OBJECT_free) 523 num = backend._lib.sk_ASN1_OBJECT_num(sk) 524 ekus = [] 525 526 for i in range(num): 527 obj = backend._lib.sk_ASN1_OBJECT_value(sk, i) 528 backend.openssl_assert(obj != backend._ffi.NULL) 529 oid = x509.ObjectIdentifier(_obj2txt(backend, obj)) 530 ekus.append(oid) 531 532 return x509.ExtendedKeyUsage(ekus) 533 534 535_DISTPOINT_TYPE_FULLNAME = 0 536_DISTPOINT_TYPE_RELATIVENAME = 1 537 538 539def _decode_dist_points(backend, cdps): 540 cdps = backend._ffi.cast("Cryptography_STACK_OF_DIST_POINT *", cdps) 541 cdps = backend._ffi.gc(cdps, backend._lib.CRL_DIST_POINTS_free) 542 543 num = backend._lib.sk_DIST_POINT_num(cdps) 544 dist_points = [] 545 for i in range(num): 546 full_name = None 547 relative_name = None 548 crl_issuer = None 549 reasons = None 550 cdp = backend._lib.sk_DIST_POINT_value(cdps, i) 551 if cdp.reasons != backend._ffi.NULL: 552 reasons = _decode_reasons(backend, cdp.reasons) 553 554 if cdp.CRLissuer != backend._ffi.NULL: 555 crl_issuer = _decode_general_names(backend, cdp.CRLissuer) 556 557 # Certificates may have a crl_issuer/reasons and no distribution 558 # point so make sure it's not null. 559 if cdp.distpoint != backend._ffi.NULL: 560 full_name, relative_name = _decode_distpoint( 561 backend, cdp.distpoint 562 ) 563 564 dist_points.append( 565 x509.DistributionPoint( 566 full_name, relative_name, reasons, crl_issuer 567 ) 568 ) 569 570 return dist_points 571 572 573# ReasonFlags ::= BIT STRING { 574# unused (0), 575# keyCompromise (1), 576# cACompromise (2), 577# affiliationChanged (3), 578# superseded (4), 579# cessationOfOperation (5), 580# certificateHold (6), 581# privilegeWithdrawn (7), 582# aACompromise (8) } 583_REASON_BIT_MAPPING = { 584 1: x509.ReasonFlags.key_compromise, 585 2: x509.ReasonFlags.ca_compromise, 586 3: x509.ReasonFlags.affiliation_changed, 587 4: x509.ReasonFlags.superseded, 588 5: x509.ReasonFlags.cessation_of_operation, 589 6: x509.ReasonFlags.certificate_hold, 590 7: x509.ReasonFlags.privilege_withdrawn, 591 8: x509.ReasonFlags.aa_compromise, 592} 593 594 595def _decode_reasons(backend, reasons): 596 # We will check each bit from RFC 5280 597 enum_reasons = [] 598 for bit_position, reason in six.iteritems(_REASON_BIT_MAPPING): 599 if backend._lib.ASN1_BIT_STRING_get_bit(reasons, bit_position): 600 enum_reasons.append(reason) 601 602 return frozenset(enum_reasons) 603 604 605def _decode_distpoint(backend, distpoint): 606 if distpoint.type == _DISTPOINT_TYPE_FULLNAME: 607 full_name = _decode_general_names(backend, distpoint.name.fullname) 608 return full_name, None 609 610 # OpenSSL code doesn't test for a specific type for 611 # relativename, everything that isn't fullname is considered 612 # relativename. Per RFC 5280: 613 # 614 # DistributionPointName ::= CHOICE { 615 # fullName [0] GeneralNames, 616 # nameRelativeToCRLIssuer [1] RelativeDistinguishedName } 617 rns = distpoint.name.relativename 618 rnum = backend._lib.sk_X509_NAME_ENTRY_num(rns) 619 attributes = set() 620 for i in range(rnum): 621 rn = backend._lib.sk_X509_NAME_ENTRY_value(rns, i) 622 backend.openssl_assert(rn != backend._ffi.NULL) 623 attributes.add(_decode_x509_name_entry(backend, rn)) 624 625 relative_name = x509.RelativeDistinguishedName(attributes) 626 627 return None, relative_name 628 629 630def _decode_crl_distribution_points(backend, cdps): 631 dist_points = _decode_dist_points(backend, cdps) 632 return x509.CRLDistributionPoints(dist_points) 633 634 635def _decode_freshest_crl(backend, cdps): 636 dist_points = _decode_dist_points(backend, cdps) 637 return x509.FreshestCRL(dist_points) 638 639 640def _decode_inhibit_any_policy(backend, asn1_int): 641 asn1_int = backend._ffi.cast("ASN1_INTEGER *", asn1_int) 642 asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free) 643 skip_certs = _asn1_integer_to_int(backend, asn1_int) 644 return x509.InhibitAnyPolicy(skip_certs) 645 646 647def _decode_scts(backend, asn1_scts): 648 from cryptography.hazmat.backends.openssl.x509 import ( 649 _SignedCertificateTimestamp, 650 ) 651 652 asn1_scts = backend._ffi.cast("Cryptography_STACK_OF_SCT *", asn1_scts) 653 asn1_scts = backend._ffi.gc(asn1_scts, backend._lib.SCT_LIST_free) 654 655 scts = [] 656 for i in range(backend._lib.sk_SCT_num(asn1_scts)): 657 sct = backend._lib.sk_SCT_value(asn1_scts, i) 658 659 scts.append(_SignedCertificateTimestamp(backend, asn1_scts, sct)) 660 return scts 661 662 663def _decode_precert_signed_certificate_timestamps(backend, asn1_scts): 664 return x509.PrecertificateSignedCertificateTimestamps( 665 _decode_scts(backend, asn1_scts) 666 ) 667 668 669def _decode_signed_certificate_timestamps(backend, asn1_scts): 670 return x509.SignedCertificateTimestamps(_decode_scts(backend, asn1_scts)) 671 672 673# CRLReason ::= ENUMERATED { 674# unspecified (0), 675# keyCompromise (1), 676# cACompromise (2), 677# affiliationChanged (3), 678# superseded (4), 679# cessationOfOperation (5), 680# certificateHold (6), 681# -- value 7 is not used 682# removeFromCRL (8), 683# privilegeWithdrawn (9), 684# aACompromise (10) } 685_CRL_ENTRY_REASON_CODE_TO_ENUM = { 686 0: x509.ReasonFlags.unspecified, 687 1: x509.ReasonFlags.key_compromise, 688 2: x509.ReasonFlags.ca_compromise, 689 3: x509.ReasonFlags.affiliation_changed, 690 4: x509.ReasonFlags.superseded, 691 5: x509.ReasonFlags.cessation_of_operation, 692 6: x509.ReasonFlags.certificate_hold, 693 8: x509.ReasonFlags.remove_from_crl, 694 9: x509.ReasonFlags.privilege_withdrawn, 695 10: x509.ReasonFlags.aa_compromise, 696} 697 698 699_CRL_ENTRY_REASON_ENUM_TO_CODE = { 700 x509.ReasonFlags.unspecified: 0, 701 x509.ReasonFlags.key_compromise: 1, 702 x509.ReasonFlags.ca_compromise: 2, 703 x509.ReasonFlags.affiliation_changed: 3, 704 x509.ReasonFlags.superseded: 4, 705 x509.ReasonFlags.cessation_of_operation: 5, 706 x509.ReasonFlags.certificate_hold: 6, 707 x509.ReasonFlags.remove_from_crl: 8, 708 x509.ReasonFlags.privilege_withdrawn: 9, 709 x509.ReasonFlags.aa_compromise: 10, 710} 711 712 713def _decode_crl_reason(backend, enum): 714 enum = backend._ffi.cast("ASN1_ENUMERATED *", enum) 715 enum = backend._ffi.gc(enum, backend._lib.ASN1_ENUMERATED_free) 716 code = backend._lib.ASN1_ENUMERATED_get(enum) 717 718 try: 719 return x509.CRLReason(_CRL_ENTRY_REASON_CODE_TO_ENUM[code]) 720 except KeyError: 721 raise ValueError("Unsupported reason code: {}".format(code)) 722 723 724def _decode_invalidity_date(backend, inv_date): 725 generalized_time = backend._ffi.cast("ASN1_GENERALIZEDTIME *", inv_date) 726 generalized_time = backend._ffi.gc( 727 generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free 728 ) 729 return x509.InvalidityDate( 730 _parse_asn1_generalized_time(backend, generalized_time) 731 ) 732 733 734def _decode_cert_issuer(backend, gns): 735 gns = backend._ffi.cast("GENERAL_NAMES *", gns) 736 gns = backend._ffi.gc(gns, backend._lib.GENERAL_NAMES_free) 737 general_names = _decode_general_names(backend, gns) 738 return x509.CertificateIssuer(general_names) 739 740 741def _asn1_to_der(backend, asn1_type): 742 buf = backend._ffi.new("unsigned char **") 743 res = backend._lib.i2d_ASN1_TYPE(asn1_type, buf) 744 backend.openssl_assert(res >= 0) 745 backend.openssl_assert(buf[0] != backend._ffi.NULL) 746 buf = backend._ffi.gc( 747 buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0]) 748 ) 749 return backend._ffi.buffer(buf[0], res)[:] 750 751 752def _asn1_integer_to_int(backend, asn1_int): 753 bn = backend._lib.ASN1_INTEGER_to_BN(asn1_int, backend._ffi.NULL) 754 backend.openssl_assert(bn != backend._ffi.NULL) 755 bn = backend._ffi.gc(bn, backend._lib.BN_free) 756 return backend._bn_to_int(bn) 757 758 759def _asn1_integer_to_int_or_none(backend, asn1_int): 760 if asn1_int == backend._ffi.NULL: 761 return None 762 else: 763 return _asn1_integer_to_int(backend, asn1_int) 764 765 766def _asn1_string_to_bytes(backend, asn1_string): 767 return backend._ffi.buffer(asn1_string.data, asn1_string.length)[:] 768 769 770def _asn1_string_to_ascii(backend, asn1_string): 771 return _asn1_string_to_bytes(backend, asn1_string).decode("ascii") 772 773 774def _asn1_string_to_utf8(backend, asn1_string): 775 buf = backend._ffi.new("unsigned char **") 776 res = backend._lib.ASN1_STRING_to_UTF8(buf, asn1_string) 777 if res == -1: 778 raise ValueError( 779 "Unsupported ASN1 string type. Type: {}".format(asn1_string.type) 780 ) 781 782 backend.openssl_assert(buf[0] != backend._ffi.NULL) 783 buf = backend._ffi.gc( 784 buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0]) 785 ) 786 return backend._ffi.buffer(buf[0], res)[:].decode("utf8") 787 788 789def _parse_asn1_time(backend, asn1_time): 790 backend.openssl_assert(asn1_time != backend._ffi.NULL) 791 generalized_time = backend._lib.ASN1_TIME_to_generalizedtime( 792 asn1_time, backend._ffi.NULL 793 ) 794 if generalized_time == backend._ffi.NULL: 795 raise ValueError( 796 "Couldn't parse ASN.1 time as generalizedtime {!r}".format( 797 _asn1_string_to_bytes(backend, asn1_time) 798 ) 799 ) 800 801 generalized_time = backend._ffi.gc( 802 generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free 803 ) 804 return _parse_asn1_generalized_time(backend, generalized_time) 805 806 807def _parse_asn1_generalized_time(backend, generalized_time): 808 time = _asn1_string_to_ascii( 809 backend, backend._ffi.cast("ASN1_STRING *", generalized_time) 810 ) 811 return datetime.datetime.strptime(time, "%Y%m%d%H%M%SZ") 812 813 814def _decode_nonce(backend, nonce): 815 nonce = backend._ffi.cast("ASN1_OCTET_STRING *", nonce) 816 nonce = backend._ffi.gc(nonce, backend._lib.ASN1_OCTET_STRING_free) 817 return x509.OCSPNonce(_asn1_string_to_bytes(backend, nonce)) 818 819 820_EXTENSION_HANDLERS_BASE = { 821 ExtensionOID.BASIC_CONSTRAINTS: _decode_basic_constraints, 822 ExtensionOID.SUBJECT_KEY_IDENTIFIER: _decode_subject_key_identifier, 823 ExtensionOID.KEY_USAGE: _decode_key_usage, 824 ExtensionOID.SUBJECT_ALTERNATIVE_NAME: _decode_subject_alt_name, 825 ExtensionOID.EXTENDED_KEY_USAGE: _decode_extended_key_usage, 826 ExtensionOID.AUTHORITY_KEY_IDENTIFIER: _decode_authority_key_identifier, 827 ExtensionOID.AUTHORITY_INFORMATION_ACCESS: ( 828 _decode_authority_information_access 829 ), 830 ExtensionOID.SUBJECT_INFORMATION_ACCESS: ( 831 _decode_subject_information_access 832 ), 833 ExtensionOID.CERTIFICATE_POLICIES: _decode_certificate_policies, 834 ExtensionOID.CRL_DISTRIBUTION_POINTS: _decode_crl_distribution_points, 835 ExtensionOID.FRESHEST_CRL: _decode_freshest_crl, 836 ExtensionOID.OCSP_NO_CHECK: _decode_ocsp_no_check, 837 ExtensionOID.INHIBIT_ANY_POLICY: _decode_inhibit_any_policy, 838 ExtensionOID.ISSUER_ALTERNATIVE_NAME: _decode_issuer_alt_name, 839 ExtensionOID.NAME_CONSTRAINTS: _decode_name_constraints, 840 ExtensionOID.POLICY_CONSTRAINTS: _decode_policy_constraints, 841} 842_EXTENSION_HANDLERS_SCT = { 843 ExtensionOID.PRECERT_SIGNED_CERTIFICATE_TIMESTAMPS: ( 844 _decode_precert_signed_certificate_timestamps 845 ) 846} 847 848_REVOKED_EXTENSION_HANDLERS = { 849 CRLEntryExtensionOID.CRL_REASON: _decode_crl_reason, 850 CRLEntryExtensionOID.INVALIDITY_DATE: _decode_invalidity_date, 851 CRLEntryExtensionOID.CERTIFICATE_ISSUER: _decode_cert_issuer, 852} 853 854_CRL_EXTENSION_HANDLERS = { 855 ExtensionOID.CRL_NUMBER: _decode_crl_number, 856 ExtensionOID.DELTA_CRL_INDICATOR: _decode_delta_crl_indicator, 857 ExtensionOID.AUTHORITY_KEY_IDENTIFIER: _decode_authority_key_identifier, 858 ExtensionOID.ISSUER_ALTERNATIVE_NAME: _decode_issuer_alt_name, 859 ExtensionOID.AUTHORITY_INFORMATION_ACCESS: ( 860 _decode_authority_information_access 861 ), 862 ExtensionOID.ISSUING_DISTRIBUTION_POINT: _decode_issuing_dist_point, 863 ExtensionOID.FRESHEST_CRL: _decode_freshest_crl, 864} 865 866_OCSP_REQ_EXTENSION_HANDLERS = { 867 OCSPExtensionOID.NONCE: _decode_nonce, 868} 869 870_OCSP_BASICRESP_EXTENSION_HANDLERS = { 871 OCSPExtensionOID.NONCE: _decode_nonce, 872} 873 874_OCSP_SINGLERESP_EXTENSION_HANDLERS_SCT = { 875 ExtensionOID.SIGNED_CERTIFICATE_TIMESTAMPS: ( 876 _decode_signed_certificate_timestamps 877 ) 878} 879