1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7import datetime 8import ipaddress 9 10import asn1crypto.core 11 12import six 13 14from cryptography import x509 15from cryptography.x509.extensions import _TLS_FEATURE_TYPE_TO_ENUM 16from cryptography.x509.name import _ASN1_TYPE_TO_ENUM 17from cryptography.x509.oid import ( 18 CRLEntryExtensionOID, CertificatePoliciesOID, ExtensionOID, 19 OCSPExtensionOID, 20) 21 22 23class _Integers(asn1crypto.core.SequenceOf): 24 _child_spec = asn1crypto.core.Integer 25 26 27def _obj2txt(backend, obj): 28 # Set to 80 on the recommendation of 29 # https://www.openssl.org/docs/crypto/OBJ_nid2ln.html#return_values 30 # 31 # But OIDs longer than this occur in real life (e.g. Active 32 # Directory makes some very long OIDs). So we need to detect 33 # and properly handle the case where the default buffer is not 34 # big enough. 35 # 36 buf_len = 80 37 buf = backend._ffi.new("char[]", buf_len) 38 39 # 'res' is the number of bytes that *would* be written if the 40 # buffer is large enough. If 'res' > buf_len - 1, we need to 41 # alloc a big-enough buffer and go again. 42 res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1) 43 if res > buf_len - 1: # account for terminating null byte 44 buf_len = res + 1 45 buf = backend._ffi.new("char[]", buf_len) 46 res = backend._lib.OBJ_obj2txt(buf, buf_len, obj, 1) 47 backend.openssl_assert(res > 0) 48 return backend._ffi.buffer(buf, res)[:].decode() 49 50 51def _decode_x509_name_entry(backend, x509_name_entry): 52 obj = backend._lib.X509_NAME_ENTRY_get_object(x509_name_entry) 53 backend.openssl_assert(obj != backend._ffi.NULL) 54 data = backend._lib.X509_NAME_ENTRY_get_data(x509_name_entry) 55 backend.openssl_assert(data != backend._ffi.NULL) 56 value = _asn1_string_to_utf8(backend, data) 57 oid = _obj2txt(backend, obj) 58 type = _ASN1_TYPE_TO_ENUM[data.type] 59 60 return x509.NameAttribute(x509.ObjectIdentifier(oid), value, type) 61 62 63def _decode_x509_name(backend, x509_name): 64 count = backend._lib.X509_NAME_entry_count(x509_name) 65 attributes = [] 66 prev_set_id = -1 67 for x in range(count): 68 entry = backend._lib.X509_NAME_get_entry(x509_name, x) 69 attribute = _decode_x509_name_entry(backend, entry) 70 set_id = backend._lib.Cryptography_X509_NAME_ENTRY_set(entry) 71 if set_id != prev_set_id: 72 attributes.append(set([attribute])) 73 else: 74 # is in the same RDN a previous entry 75 attributes[-1].add(attribute) 76 prev_set_id = set_id 77 78 return x509.Name(x509.RelativeDistinguishedName(rdn) for rdn in attributes) 79 80 81def _decode_general_names(backend, gns): 82 num = backend._lib.sk_GENERAL_NAME_num(gns) 83 names = [] 84 for i in range(num): 85 gn = backend._lib.sk_GENERAL_NAME_value(gns, i) 86 backend.openssl_assert(gn != backend._ffi.NULL) 87 names.append(_decode_general_name(backend, gn)) 88 89 return names 90 91 92def _decode_general_name(backend, gn): 93 if gn.type == backend._lib.GEN_DNS: 94 # Convert to bytes and then decode to utf8. We don't use 95 # asn1_string_to_utf8 here because it doesn't properly convert 96 # utf8 from ia5strings. 97 data = _asn1_string_to_bytes(backend, gn.d.dNSName).decode("utf8") 98 # We don't use the constructor for DNSName so we can bypass validation 99 # This allows us to create DNSName objects that have unicode chars 100 # when a certificate (against the RFC) contains them. 101 return x509.DNSName._init_without_validation(data) 102 elif gn.type == backend._lib.GEN_URI: 103 # Convert to bytes and then decode to utf8. We don't use 104 # asn1_string_to_utf8 here because it doesn't properly convert 105 # utf8 from ia5strings. 106 data = _asn1_string_to_bytes( 107 backend, gn.d.uniformResourceIdentifier 108 ).decode("utf8") 109 # We don't use the constructor for URI so we can bypass validation 110 # This allows us to create URI objects that have unicode chars 111 # when a certificate (against the RFC) contains them. 112 return x509.UniformResourceIdentifier._init_without_validation(data) 113 elif gn.type == backend._lib.GEN_RID: 114 oid = _obj2txt(backend, gn.d.registeredID) 115 return x509.RegisteredID(x509.ObjectIdentifier(oid)) 116 elif gn.type == backend._lib.GEN_IPADD: 117 data = _asn1_string_to_bytes(backend, gn.d.iPAddress) 118 data_len = len(data) 119 if data_len == 8 or data_len == 32: 120 # This is an IPv4 or IPv6 Network and not a single IP. This 121 # type of data appears in Name Constraints. Unfortunately, 122 # ipaddress doesn't support packed bytes + netmask. Additionally, 123 # IPv6Network can only handle CIDR rather than the full 16 byte 124 # netmask. To handle this we convert the netmask to integer, then 125 # find the first 0 bit, which will be the prefix. If another 1 126 # bit is present after that the netmask is invalid. 127 base = ipaddress.ip_address(data[:data_len // 2]) 128 netmask = ipaddress.ip_address(data[data_len // 2:]) 129 bits = bin(int(netmask))[2:] 130 prefix = bits.find('0') 131 # If no 0 bits are found it is a /32 or /128 132 if prefix == -1: 133 prefix = len(bits) 134 135 if "1" in bits[prefix:]: 136 raise ValueError("Invalid netmask") 137 138 ip = ipaddress.ip_network(base.exploded + u"/{0}".format(prefix)) 139 else: 140 ip = ipaddress.ip_address(data) 141 142 return x509.IPAddress(ip) 143 elif gn.type == backend._lib.GEN_DIRNAME: 144 return x509.DirectoryName( 145 _decode_x509_name(backend, gn.d.directoryName) 146 ) 147 elif gn.type == backend._lib.GEN_EMAIL: 148 # Convert to bytes and then decode to utf8. We don't use 149 # asn1_string_to_utf8 here because it doesn't properly convert 150 # utf8 from ia5strings. 151 data = _asn1_string_to_bytes(backend, gn.d.rfc822Name).decode("utf8") 152 # We don't use the constructor for RFC822Name so we can bypass 153 # validation. This allows us to create RFC822Name objects that have 154 # unicode chars when a certificate (against the RFC) contains them. 155 return x509.RFC822Name._init_without_validation(data) 156 elif gn.type == backend._lib.GEN_OTHERNAME: 157 type_id = _obj2txt(backend, gn.d.otherName.type_id) 158 value = _asn1_to_der(backend, gn.d.otherName.value) 159 return x509.OtherName(x509.ObjectIdentifier(type_id), value) 160 else: 161 # x400Address or ediPartyName 162 raise x509.UnsupportedGeneralNameType( 163 "{0} is not a supported type".format( 164 x509._GENERAL_NAMES.get(gn.type, gn.type) 165 ), 166 gn.type 167 ) 168 169 170def _decode_ocsp_no_check(backend, ext): 171 return x509.OCSPNoCheck() 172 173 174def _decode_crl_number(backend, ext): 175 asn1_int = backend._ffi.cast("ASN1_INTEGER *", ext) 176 asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free) 177 return x509.CRLNumber(_asn1_integer_to_int(backend, asn1_int)) 178 179 180def _decode_delta_crl_indicator(backend, ext): 181 asn1_int = backend._ffi.cast("ASN1_INTEGER *", ext) 182 asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free) 183 return x509.DeltaCRLIndicator(_asn1_integer_to_int(backend, asn1_int)) 184 185 186class _X509ExtensionParser(object): 187 def __init__(self, ext_count, get_ext, handlers): 188 self.ext_count = ext_count 189 self.get_ext = get_ext 190 self.handlers = handlers 191 192 def parse(self, backend, x509_obj): 193 extensions = [] 194 seen_oids = set() 195 for i in range(self.ext_count(backend, x509_obj)): 196 ext = self.get_ext(backend, x509_obj, i) 197 backend.openssl_assert(ext != backend._ffi.NULL) 198 crit = backend._lib.X509_EXTENSION_get_critical(ext) 199 critical = crit == 1 200 oid = x509.ObjectIdentifier( 201 _obj2txt(backend, backend._lib.X509_EXTENSION_get_object(ext)) 202 ) 203 if oid in seen_oids: 204 raise x509.DuplicateExtension( 205 "Duplicate {0} extension found".format(oid), oid 206 ) 207 208 # These OIDs are only supported in OpenSSL 1.1.0+ but we want 209 # to support them in all versions of OpenSSL so we decode them 210 # ourselves. 211 if oid == ExtensionOID.TLS_FEATURE: 212 data = backend._lib.X509_EXTENSION_get_data(ext) 213 parsed = _Integers.load(_asn1_string_to_bytes(backend, data)) 214 value = x509.TLSFeature( 215 [_TLS_FEATURE_TYPE_TO_ENUM[x.native] for x in parsed] 216 ) 217 extensions.append(x509.Extension(oid, critical, value)) 218 seen_oids.add(oid) 219 continue 220 elif oid == ExtensionOID.PRECERT_POISON: 221 data = backend._lib.X509_EXTENSION_get_data(ext) 222 parsed = asn1crypto.core.Null.load( 223 _asn1_string_to_bytes(backend, data) 224 ) 225 assert parsed == asn1crypto.core.Null() 226 extensions.append(x509.Extension( 227 oid, critical, x509.PrecertPoison() 228 )) 229 seen_oids.add(oid) 230 continue 231 232 try: 233 handler = self.handlers[oid] 234 except KeyError: 235 # Dump the DER payload into an UnrecognizedExtension object 236 data = backend._lib.X509_EXTENSION_get_data(ext) 237 backend.openssl_assert(data != backend._ffi.NULL) 238 der = backend._ffi.buffer(data.data, data.length)[:] 239 unrecognized = x509.UnrecognizedExtension(oid, der) 240 extensions.append( 241 x509.Extension(oid, critical, unrecognized) 242 ) 243 else: 244 ext_data = backend._lib.X509V3_EXT_d2i(ext) 245 if ext_data == backend._ffi.NULL: 246 backend._consume_errors() 247 raise ValueError( 248 "The {0} extension is invalid and can't be " 249 "parsed".format(oid) 250 ) 251 252 value = handler(backend, ext_data) 253 extensions.append(x509.Extension(oid, critical, value)) 254 255 seen_oids.add(oid) 256 257 return x509.Extensions(extensions) 258 259 260def _decode_certificate_policies(backend, cp): 261 cp = backend._ffi.cast("Cryptography_STACK_OF_POLICYINFO *", cp) 262 cp = backend._ffi.gc(cp, backend._lib.CERTIFICATEPOLICIES_free) 263 264 num = backend._lib.sk_POLICYINFO_num(cp) 265 certificate_policies = [] 266 for i in range(num): 267 qualifiers = None 268 pi = backend._lib.sk_POLICYINFO_value(cp, i) 269 oid = x509.ObjectIdentifier(_obj2txt(backend, pi.policyid)) 270 if pi.qualifiers != backend._ffi.NULL: 271 qnum = backend._lib.sk_POLICYQUALINFO_num(pi.qualifiers) 272 qualifiers = [] 273 for j in range(qnum): 274 pqi = backend._lib.sk_POLICYQUALINFO_value( 275 pi.qualifiers, j 276 ) 277 pqualid = x509.ObjectIdentifier( 278 _obj2txt(backend, pqi.pqualid) 279 ) 280 if pqualid == CertificatePoliciesOID.CPS_QUALIFIER: 281 cpsuri = backend._ffi.buffer( 282 pqi.d.cpsuri.data, pqi.d.cpsuri.length 283 )[:].decode('ascii') 284 qualifiers.append(cpsuri) 285 else: 286 assert pqualid == CertificatePoliciesOID.CPS_USER_NOTICE 287 user_notice = _decode_user_notice( 288 backend, pqi.d.usernotice 289 ) 290 qualifiers.append(user_notice) 291 292 certificate_policies.append( 293 x509.PolicyInformation(oid, qualifiers) 294 ) 295 296 return x509.CertificatePolicies(certificate_policies) 297 298 299def _decode_user_notice(backend, un): 300 explicit_text = None 301 notice_reference = None 302 303 if un.exptext != backend._ffi.NULL: 304 explicit_text = _asn1_string_to_utf8(backend, un.exptext) 305 306 if un.noticeref != backend._ffi.NULL: 307 organization = _asn1_string_to_utf8( 308 backend, un.noticeref.organization 309 ) 310 311 num = backend._lib.sk_ASN1_INTEGER_num( 312 un.noticeref.noticenos 313 ) 314 notice_numbers = [] 315 for i in range(num): 316 asn1_int = backend._lib.sk_ASN1_INTEGER_value( 317 un.noticeref.noticenos, i 318 ) 319 notice_num = _asn1_integer_to_int(backend, asn1_int) 320 notice_numbers.append(notice_num) 321 322 notice_reference = x509.NoticeReference( 323 organization, notice_numbers 324 ) 325 326 return x509.UserNotice(notice_reference, explicit_text) 327 328 329def _decode_basic_constraints(backend, bc_st): 330 basic_constraints = backend._ffi.cast("BASIC_CONSTRAINTS *", bc_st) 331 basic_constraints = backend._ffi.gc( 332 basic_constraints, backend._lib.BASIC_CONSTRAINTS_free 333 ) 334 # The byte representation of an ASN.1 boolean true is \xff. OpenSSL 335 # chooses to just map this to its ordinal value, so true is 255 and 336 # false is 0. 337 ca = basic_constraints.ca == 255 338 path_length = _asn1_integer_to_int_or_none( 339 backend, basic_constraints.pathlen 340 ) 341 342 return x509.BasicConstraints(ca, path_length) 343 344 345def _decode_subject_key_identifier(backend, asn1_string): 346 asn1_string = backend._ffi.cast("ASN1_OCTET_STRING *", asn1_string) 347 asn1_string = backend._ffi.gc( 348 asn1_string, backend._lib.ASN1_OCTET_STRING_free 349 ) 350 return x509.SubjectKeyIdentifier( 351 backend._ffi.buffer(asn1_string.data, asn1_string.length)[:] 352 ) 353 354 355def _decode_authority_key_identifier(backend, akid): 356 akid = backend._ffi.cast("AUTHORITY_KEYID *", akid) 357 akid = backend._ffi.gc(akid, backend._lib.AUTHORITY_KEYID_free) 358 key_identifier = None 359 authority_cert_issuer = None 360 361 if akid.keyid != backend._ffi.NULL: 362 key_identifier = backend._ffi.buffer( 363 akid.keyid.data, akid.keyid.length 364 )[:] 365 366 if akid.issuer != backend._ffi.NULL: 367 authority_cert_issuer = _decode_general_names( 368 backend, akid.issuer 369 ) 370 371 authority_cert_serial_number = _asn1_integer_to_int_or_none( 372 backend, akid.serial 373 ) 374 375 return x509.AuthorityKeyIdentifier( 376 key_identifier, authority_cert_issuer, authority_cert_serial_number 377 ) 378 379 380def _decode_authority_information_access(backend, aia): 381 aia = backend._ffi.cast("Cryptography_STACK_OF_ACCESS_DESCRIPTION *", aia) 382 aia = backend._ffi.gc(aia, backend._lib.sk_ACCESS_DESCRIPTION_free) 383 num = backend._lib.sk_ACCESS_DESCRIPTION_num(aia) 384 access_descriptions = [] 385 for i in range(num): 386 ad = backend._lib.sk_ACCESS_DESCRIPTION_value(aia, i) 387 backend.openssl_assert(ad.method != backend._ffi.NULL) 388 oid = x509.ObjectIdentifier(_obj2txt(backend, ad.method)) 389 backend.openssl_assert(ad.location != backend._ffi.NULL) 390 gn = _decode_general_name(backend, ad.location) 391 access_descriptions.append(x509.AccessDescription(oid, gn)) 392 393 return x509.AuthorityInformationAccess(access_descriptions) 394 395 396def _decode_key_usage(backend, bit_string): 397 bit_string = backend._ffi.cast("ASN1_BIT_STRING *", bit_string) 398 bit_string = backend._ffi.gc(bit_string, backend._lib.ASN1_BIT_STRING_free) 399 get_bit = backend._lib.ASN1_BIT_STRING_get_bit 400 digital_signature = get_bit(bit_string, 0) == 1 401 content_commitment = get_bit(bit_string, 1) == 1 402 key_encipherment = get_bit(bit_string, 2) == 1 403 data_encipherment = get_bit(bit_string, 3) == 1 404 key_agreement = get_bit(bit_string, 4) == 1 405 key_cert_sign = get_bit(bit_string, 5) == 1 406 crl_sign = get_bit(bit_string, 6) == 1 407 encipher_only = get_bit(bit_string, 7) == 1 408 decipher_only = get_bit(bit_string, 8) == 1 409 return x509.KeyUsage( 410 digital_signature, 411 content_commitment, 412 key_encipherment, 413 data_encipherment, 414 key_agreement, 415 key_cert_sign, 416 crl_sign, 417 encipher_only, 418 decipher_only 419 ) 420 421 422def _decode_general_names_extension(backend, gns): 423 gns = backend._ffi.cast("GENERAL_NAMES *", gns) 424 gns = backend._ffi.gc(gns, backend._lib.GENERAL_NAMES_free) 425 general_names = _decode_general_names(backend, gns) 426 return general_names 427 428 429def _decode_subject_alt_name(backend, ext): 430 return x509.SubjectAlternativeName( 431 _decode_general_names_extension(backend, ext) 432 ) 433 434 435def _decode_issuer_alt_name(backend, ext): 436 return x509.IssuerAlternativeName( 437 _decode_general_names_extension(backend, ext) 438 ) 439 440 441def _decode_name_constraints(backend, nc): 442 nc = backend._ffi.cast("NAME_CONSTRAINTS *", nc) 443 nc = backend._ffi.gc(nc, backend._lib.NAME_CONSTRAINTS_free) 444 permitted = _decode_general_subtrees(backend, nc.permittedSubtrees) 445 excluded = _decode_general_subtrees(backend, nc.excludedSubtrees) 446 return x509.NameConstraints( 447 permitted_subtrees=permitted, excluded_subtrees=excluded 448 ) 449 450 451def _decode_general_subtrees(backend, stack_subtrees): 452 if stack_subtrees == backend._ffi.NULL: 453 return None 454 455 num = backend._lib.sk_GENERAL_SUBTREE_num(stack_subtrees) 456 subtrees = [] 457 458 for i in range(num): 459 obj = backend._lib.sk_GENERAL_SUBTREE_value(stack_subtrees, i) 460 backend.openssl_assert(obj != backend._ffi.NULL) 461 name = _decode_general_name(backend, obj.base) 462 subtrees.append(name) 463 464 return subtrees 465 466 467def _decode_issuing_dist_point(backend, idp): 468 idp = backend._ffi.cast("ISSUING_DIST_POINT *", idp) 469 idp = backend._ffi.gc(idp, backend._lib.ISSUING_DIST_POINT_free) 470 if idp.distpoint != backend._ffi.NULL: 471 full_name, relative_name = _decode_distpoint(backend, idp.distpoint) 472 else: 473 full_name = None 474 relative_name = None 475 476 only_user = idp.onlyuser == 255 477 only_ca = idp.onlyCA == 255 478 indirect_crl = idp.indirectCRL == 255 479 only_attr = idp.onlyattr == 255 480 if idp.onlysomereasons != backend._ffi.NULL: 481 only_some_reasons = _decode_reasons(backend, idp.onlysomereasons) 482 else: 483 only_some_reasons = None 484 485 return x509.IssuingDistributionPoint( 486 full_name, relative_name, only_user, only_ca, only_some_reasons, 487 indirect_crl, only_attr 488 ) 489 490 491def _decode_policy_constraints(backend, pc): 492 pc = backend._ffi.cast("POLICY_CONSTRAINTS *", pc) 493 pc = backend._ffi.gc(pc, backend._lib.POLICY_CONSTRAINTS_free) 494 495 require_explicit_policy = _asn1_integer_to_int_or_none( 496 backend, pc.requireExplicitPolicy 497 ) 498 inhibit_policy_mapping = _asn1_integer_to_int_or_none( 499 backend, pc.inhibitPolicyMapping 500 ) 501 502 return x509.PolicyConstraints( 503 require_explicit_policy, inhibit_policy_mapping 504 ) 505 506 507def _decode_extended_key_usage(backend, sk): 508 sk = backend._ffi.cast("Cryptography_STACK_OF_ASN1_OBJECT *", sk) 509 sk = backend._ffi.gc(sk, backend._lib.sk_ASN1_OBJECT_free) 510 num = backend._lib.sk_ASN1_OBJECT_num(sk) 511 ekus = [] 512 513 for i in range(num): 514 obj = backend._lib.sk_ASN1_OBJECT_value(sk, i) 515 backend.openssl_assert(obj != backend._ffi.NULL) 516 oid = x509.ObjectIdentifier(_obj2txt(backend, obj)) 517 ekus.append(oid) 518 519 return x509.ExtendedKeyUsage(ekus) 520 521 522_DISTPOINT_TYPE_FULLNAME = 0 523_DISTPOINT_TYPE_RELATIVENAME = 1 524 525 526def _decode_dist_points(backend, cdps): 527 cdps = backend._ffi.cast("Cryptography_STACK_OF_DIST_POINT *", cdps) 528 cdps = backend._ffi.gc(cdps, backend._lib.CRL_DIST_POINTS_free) 529 530 num = backend._lib.sk_DIST_POINT_num(cdps) 531 dist_points = [] 532 for i in range(num): 533 full_name = None 534 relative_name = None 535 crl_issuer = None 536 reasons = None 537 cdp = backend._lib.sk_DIST_POINT_value(cdps, i) 538 if cdp.reasons != backend._ffi.NULL: 539 reasons = _decode_reasons(backend, cdp.reasons) 540 541 if cdp.CRLissuer != backend._ffi.NULL: 542 crl_issuer = _decode_general_names(backend, cdp.CRLissuer) 543 544 # Certificates may have a crl_issuer/reasons and no distribution 545 # point so make sure it's not null. 546 if cdp.distpoint != backend._ffi.NULL: 547 full_name, relative_name = _decode_distpoint( 548 backend, cdp.distpoint 549 ) 550 551 dist_points.append( 552 x509.DistributionPoint( 553 full_name, relative_name, reasons, crl_issuer 554 ) 555 ) 556 557 return dist_points 558 559 560# ReasonFlags ::= BIT STRING { 561# unused (0), 562# keyCompromise (1), 563# cACompromise (2), 564# affiliationChanged (3), 565# superseded (4), 566# cessationOfOperation (5), 567# certificateHold (6), 568# privilegeWithdrawn (7), 569# aACompromise (8) } 570_REASON_BIT_MAPPING = { 571 1: x509.ReasonFlags.key_compromise, 572 2: x509.ReasonFlags.ca_compromise, 573 3: x509.ReasonFlags.affiliation_changed, 574 4: x509.ReasonFlags.superseded, 575 5: x509.ReasonFlags.cessation_of_operation, 576 6: x509.ReasonFlags.certificate_hold, 577 7: x509.ReasonFlags.privilege_withdrawn, 578 8: x509.ReasonFlags.aa_compromise, 579} 580 581 582def _decode_reasons(backend, reasons): 583 # We will check each bit from RFC 5280 584 enum_reasons = [] 585 for bit_position, reason in six.iteritems(_REASON_BIT_MAPPING): 586 if backend._lib.ASN1_BIT_STRING_get_bit(reasons, bit_position): 587 enum_reasons.append(reason) 588 589 return frozenset(enum_reasons) 590 591 592def _decode_distpoint(backend, distpoint): 593 if distpoint.type == _DISTPOINT_TYPE_FULLNAME: 594 full_name = _decode_general_names(backend, distpoint.name.fullname) 595 return full_name, None 596 597 # OpenSSL code doesn't test for a specific type for 598 # relativename, everything that isn't fullname is considered 599 # relativename. Per RFC 5280: 600 # 601 # DistributionPointName ::= CHOICE { 602 # fullName [0] GeneralNames, 603 # nameRelativeToCRLIssuer [1] RelativeDistinguishedName } 604 rns = distpoint.name.relativename 605 rnum = backend._lib.sk_X509_NAME_ENTRY_num(rns) 606 attributes = set() 607 for i in range(rnum): 608 rn = backend._lib.sk_X509_NAME_ENTRY_value( 609 rns, i 610 ) 611 backend.openssl_assert(rn != backend._ffi.NULL) 612 attributes.add( 613 _decode_x509_name_entry(backend, rn) 614 ) 615 616 relative_name = x509.RelativeDistinguishedName(attributes) 617 618 return None, relative_name 619 620 621def _decode_crl_distribution_points(backend, cdps): 622 dist_points = _decode_dist_points(backend, cdps) 623 return x509.CRLDistributionPoints(dist_points) 624 625 626def _decode_freshest_crl(backend, cdps): 627 dist_points = _decode_dist_points(backend, cdps) 628 return x509.FreshestCRL(dist_points) 629 630 631def _decode_inhibit_any_policy(backend, asn1_int): 632 asn1_int = backend._ffi.cast("ASN1_INTEGER *", asn1_int) 633 asn1_int = backend._ffi.gc(asn1_int, backend._lib.ASN1_INTEGER_free) 634 skip_certs = _asn1_integer_to_int(backend, asn1_int) 635 return x509.InhibitAnyPolicy(skip_certs) 636 637 638def _decode_precert_signed_certificate_timestamps(backend, asn1_scts): 639 from cryptography.hazmat.backends.openssl.x509 import ( 640 _SignedCertificateTimestamp 641 ) 642 asn1_scts = backend._ffi.cast("Cryptography_STACK_OF_SCT *", asn1_scts) 643 asn1_scts = backend._ffi.gc(asn1_scts, backend._lib.SCT_LIST_free) 644 645 scts = [] 646 for i in range(backend._lib.sk_SCT_num(asn1_scts)): 647 sct = backend._lib.sk_SCT_value(asn1_scts, i) 648 649 scts.append(_SignedCertificateTimestamp(backend, asn1_scts, sct)) 650 return x509.PrecertificateSignedCertificateTimestamps(scts) 651 652 653# CRLReason ::= ENUMERATED { 654# unspecified (0), 655# keyCompromise (1), 656# cACompromise (2), 657# affiliationChanged (3), 658# superseded (4), 659# cessationOfOperation (5), 660# certificateHold (6), 661# -- value 7 is not used 662# removeFromCRL (8), 663# privilegeWithdrawn (9), 664# aACompromise (10) } 665_CRL_ENTRY_REASON_CODE_TO_ENUM = { 666 0: x509.ReasonFlags.unspecified, 667 1: x509.ReasonFlags.key_compromise, 668 2: x509.ReasonFlags.ca_compromise, 669 3: x509.ReasonFlags.affiliation_changed, 670 4: x509.ReasonFlags.superseded, 671 5: x509.ReasonFlags.cessation_of_operation, 672 6: x509.ReasonFlags.certificate_hold, 673 8: x509.ReasonFlags.remove_from_crl, 674 9: x509.ReasonFlags.privilege_withdrawn, 675 10: x509.ReasonFlags.aa_compromise, 676} 677 678 679_CRL_ENTRY_REASON_ENUM_TO_CODE = { 680 x509.ReasonFlags.unspecified: 0, 681 x509.ReasonFlags.key_compromise: 1, 682 x509.ReasonFlags.ca_compromise: 2, 683 x509.ReasonFlags.affiliation_changed: 3, 684 x509.ReasonFlags.superseded: 4, 685 x509.ReasonFlags.cessation_of_operation: 5, 686 x509.ReasonFlags.certificate_hold: 6, 687 x509.ReasonFlags.remove_from_crl: 8, 688 x509.ReasonFlags.privilege_withdrawn: 9, 689 x509.ReasonFlags.aa_compromise: 10 690} 691 692 693def _decode_crl_reason(backend, enum): 694 enum = backend._ffi.cast("ASN1_ENUMERATED *", enum) 695 enum = backend._ffi.gc(enum, backend._lib.ASN1_ENUMERATED_free) 696 code = backend._lib.ASN1_ENUMERATED_get(enum) 697 698 try: 699 return x509.CRLReason(_CRL_ENTRY_REASON_CODE_TO_ENUM[code]) 700 except KeyError: 701 raise ValueError("Unsupported reason code: {0}".format(code)) 702 703 704def _decode_invalidity_date(backend, inv_date): 705 generalized_time = backend._ffi.cast( 706 "ASN1_GENERALIZEDTIME *", inv_date 707 ) 708 generalized_time = backend._ffi.gc( 709 generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free 710 ) 711 return x509.InvalidityDate( 712 _parse_asn1_generalized_time(backend, generalized_time) 713 ) 714 715 716def _decode_cert_issuer(backend, gns): 717 gns = backend._ffi.cast("GENERAL_NAMES *", gns) 718 gns = backend._ffi.gc(gns, backend._lib.GENERAL_NAMES_free) 719 general_names = _decode_general_names(backend, gns) 720 return x509.CertificateIssuer(general_names) 721 722 723def _asn1_to_der(backend, asn1_type): 724 buf = backend._ffi.new("unsigned char **") 725 res = backend._lib.i2d_ASN1_TYPE(asn1_type, buf) 726 backend.openssl_assert(res >= 0) 727 backend.openssl_assert(buf[0] != backend._ffi.NULL) 728 buf = backend._ffi.gc( 729 buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0]) 730 ) 731 return backend._ffi.buffer(buf[0], res)[:] 732 733 734def _asn1_integer_to_int(backend, asn1_int): 735 bn = backend._lib.ASN1_INTEGER_to_BN(asn1_int, backend._ffi.NULL) 736 backend.openssl_assert(bn != backend._ffi.NULL) 737 bn = backend._ffi.gc(bn, backend._lib.BN_free) 738 return backend._bn_to_int(bn) 739 740 741def _asn1_integer_to_int_or_none(backend, asn1_int): 742 if asn1_int == backend._ffi.NULL: 743 return None 744 else: 745 return _asn1_integer_to_int(backend, asn1_int) 746 747 748def _asn1_string_to_bytes(backend, asn1_string): 749 return backend._ffi.buffer(asn1_string.data, asn1_string.length)[:] 750 751 752def _asn1_string_to_ascii(backend, asn1_string): 753 return _asn1_string_to_bytes(backend, asn1_string).decode("ascii") 754 755 756def _asn1_string_to_utf8(backend, asn1_string): 757 buf = backend._ffi.new("unsigned char **") 758 res = backend._lib.ASN1_STRING_to_UTF8(buf, asn1_string) 759 if res == -1: 760 raise ValueError( 761 "Unsupported ASN1 string type. Type: {0}".format(asn1_string.type) 762 ) 763 764 backend.openssl_assert(buf[0] != backend._ffi.NULL) 765 buf = backend._ffi.gc( 766 buf, lambda buffer: backend._lib.OPENSSL_free(buffer[0]) 767 ) 768 return backend._ffi.buffer(buf[0], res)[:].decode('utf8') 769 770 771def _parse_asn1_time(backend, asn1_time): 772 backend.openssl_assert(asn1_time != backend._ffi.NULL) 773 generalized_time = backend._lib.ASN1_TIME_to_generalizedtime( 774 asn1_time, backend._ffi.NULL 775 ) 776 if generalized_time == backend._ffi.NULL: 777 raise ValueError( 778 "Couldn't parse ASN.1 time as generalizedtime {!r}".format( 779 _asn1_string_to_bytes(backend, asn1_time) 780 ) 781 ) 782 783 generalized_time = backend._ffi.gc( 784 generalized_time, backend._lib.ASN1_GENERALIZEDTIME_free 785 ) 786 return _parse_asn1_generalized_time(backend, generalized_time) 787 788 789def _parse_asn1_generalized_time(backend, generalized_time): 790 time = _asn1_string_to_ascii( 791 backend, backend._ffi.cast("ASN1_STRING *", generalized_time) 792 ) 793 return datetime.datetime.strptime(time, "%Y%m%d%H%M%SZ") 794 795 796def _decode_nonce(backend, nonce): 797 nonce = backend._ffi.cast("ASN1_OCTET_STRING *", nonce) 798 nonce = backend._ffi.gc(nonce, backend._lib.ASN1_OCTET_STRING_free) 799 return x509.OCSPNonce(_asn1_string_to_bytes(backend, nonce)) 800 801 802_EXTENSION_HANDLERS_NO_SCT = { 803 ExtensionOID.BASIC_CONSTRAINTS: _decode_basic_constraints, 804 ExtensionOID.SUBJECT_KEY_IDENTIFIER: _decode_subject_key_identifier, 805 ExtensionOID.KEY_USAGE: _decode_key_usage, 806 ExtensionOID.SUBJECT_ALTERNATIVE_NAME: _decode_subject_alt_name, 807 ExtensionOID.EXTENDED_KEY_USAGE: _decode_extended_key_usage, 808 ExtensionOID.AUTHORITY_KEY_IDENTIFIER: _decode_authority_key_identifier, 809 ExtensionOID.AUTHORITY_INFORMATION_ACCESS: ( 810 _decode_authority_information_access 811 ), 812 ExtensionOID.CERTIFICATE_POLICIES: _decode_certificate_policies, 813 ExtensionOID.CRL_DISTRIBUTION_POINTS: _decode_crl_distribution_points, 814 ExtensionOID.FRESHEST_CRL: _decode_freshest_crl, 815 ExtensionOID.OCSP_NO_CHECK: _decode_ocsp_no_check, 816 ExtensionOID.INHIBIT_ANY_POLICY: _decode_inhibit_any_policy, 817 ExtensionOID.ISSUER_ALTERNATIVE_NAME: _decode_issuer_alt_name, 818 ExtensionOID.NAME_CONSTRAINTS: _decode_name_constraints, 819 ExtensionOID.POLICY_CONSTRAINTS: _decode_policy_constraints, 820} 821_EXTENSION_HANDLERS = _EXTENSION_HANDLERS_NO_SCT.copy() 822_EXTENSION_HANDLERS[ 823 ExtensionOID.PRECERT_SIGNED_CERTIFICATE_TIMESTAMPS 824] = _decode_precert_signed_certificate_timestamps 825 826 827_REVOKED_EXTENSION_HANDLERS = { 828 CRLEntryExtensionOID.CRL_REASON: _decode_crl_reason, 829 CRLEntryExtensionOID.INVALIDITY_DATE: _decode_invalidity_date, 830 CRLEntryExtensionOID.CERTIFICATE_ISSUER: _decode_cert_issuer, 831} 832 833_CRL_EXTENSION_HANDLERS = { 834 ExtensionOID.CRL_NUMBER: _decode_crl_number, 835 ExtensionOID.DELTA_CRL_INDICATOR: _decode_delta_crl_indicator, 836 ExtensionOID.AUTHORITY_KEY_IDENTIFIER: _decode_authority_key_identifier, 837 ExtensionOID.ISSUER_ALTERNATIVE_NAME: _decode_issuer_alt_name, 838 ExtensionOID.AUTHORITY_INFORMATION_ACCESS: ( 839 _decode_authority_information_access 840 ), 841 ExtensionOID.ISSUING_DISTRIBUTION_POINT: _decode_issuing_dist_point, 842} 843 844_OCSP_REQ_EXTENSION_HANDLERS = { 845 OCSPExtensionOID.NONCE: _decode_nonce, 846} 847 848_OCSP_BASICRESP_EXTENSION_HANDLERS = { 849 OCSPExtensionOID.NONCE: _decode_nonce, 850} 851 852_CERTIFICATE_EXTENSION_PARSER_NO_SCT = _X509ExtensionParser( 853 ext_count=lambda backend, x: backend._lib.X509_get_ext_count(x), 854 get_ext=lambda backend, x, i: backend._lib.X509_get_ext(x, i), 855 handlers=_EXTENSION_HANDLERS_NO_SCT 856) 857 858_CERTIFICATE_EXTENSION_PARSER = _X509ExtensionParser( 859 ext_count=lambda backend, x: backend._lib.X509_get_ext_count(x), 860 get_ext=lambda backend, x, i: backend._lib.X509_get_ext(x, i), 861 handlers=_EXTENSION_HANDLERS 862) 863 864_CSR_EXTENSION_PARSER = _X509ExtensionParser( 865 ext_count=lambda backend, x: backend._lib.sk_X509_EXTENSION_num(x), 866 get_ext=lambda backend, x, i: backend._lib.sk_X509_EXTENSION_value(x, i), 867 handlers=_EXTENSION_HANDLERS 868) 869 870_REVOKED_CERTIFICATE_EXTENSION_PARSER = _X509ExtensionParser( 871 ext_count=lambda backend, x: backend._lib.X509_REVOKED_get_ext_count(x), 872 get_ext=lambda backend, x, i: backend._lib.X509_REVOKED_get_ext(x, i), 873 handlers=_REVOKED_EXTENSION_HANDLERS, 874) 875 876_CRL_EXTENSION_PARSER = _X509ExtensionParser( 877 ext_count=lambda backend, x: backend._lib.X509_CRL_get_ext_count(x), 878 get_ext=lambda backend, x, i: backend._lib.X509_CRL_get_ext(x, i), 879 handlers=_CRL_EXTENSION_HANDLERS, 880) 881 882_OCSP_REQ_EXT_PARSER = _X509ExtensionParser( 883 ext_count=lambda backend, x: backend._lib.OCSP_REQUEST_get_ext_count(x), 884 get_ext=lambda backend, x, i: backend._lib.OCSP_REQUEST_get_ext(x, i), 885 handlers=_OCSP_REQ_EXTENSION_HANDLERS, 886) 887 888_OCSP_BASICRESP_EXT_PARSER = _X509ExtensionParser( 889 ext_count=lambda backend, x: backend._lib.OCSP_BASICRESP_get_ext_count(x), 890 get_ext=lambda backend, x, i: backend._lib.OCSP_BASICRESP_get_ext(x, i), 891 handlers=_OCSP_BASICRESP_EXTENSION_HANDLERS, 892) 893