1# coding: utf-8 2 3""" 4ASN.1 type classes for the online certificate status protocol (OCSP). Exports 5the following items: 6 7 - OCSPRequest() 8 - OCSPResponse() 9 10Other type classes are defined that help compose the types listed above. 11""" 12 13from __future__ import unicode_literals, division, absolute_import, print_function 14 15from ._errors import unwrap 16from .algos import DigestAlgorithm, SignedDigestAlgorithm 17from .core import ( 18 Boolean, 19 Choice, 20 Enumerated, 21 GeneralizedTime, 22 IA5String, 23 Integer, 24 Null, 25 ObjectIdentifier, 26 OctetBitString, 27 OctetString, 28 ParsableOctetString, 29 Sequence, 30 SequenceOf, 31) 32from .crl import AuthorityInfoAccessSyntax, CRLReason 33from .keys import PublicKeyAlgorithm 34from .x509 import Certificate, GeneralName, GeneralNames, Name 35 36 37# The structures in this file are taken from https://tools.ietf.org/html/rfc6960 38 39 40class Version(Integer): 41 _map = { 42 0: 'v1' 43 } 44 45 46class CertId(Sequence): 47 _fields = [ 48 ('hash_algorithm', DigestAlgorithm), 49 ('issuer_name_hash', OctetString), 50 ('issuer_key_hash', OctetString), 51 ('serial_number', Integer), 52 ] 53 54 55class ServiceLocator(Sequence): 56 _fields = [ 57 ('issuer', Name), 58 ('locator', AuthorityInfoAccessSyntax), 59 ] 60 61 62class RequestExtensionId(ObjectIdentifier): 63 _map = { 64 '1.3.6.1.5.5.7.48.1.7': 'service_locator', 65 } 66 67 68class RequestExtension(Sequence): 69 _fields = [ 70 ('extn_id', RequestExtensionId), 71 ('critical', Boolean, {'default': False}), 72 ('extn_value', ParsableOctetString), 73 ] 74 75 _oid_pair = ('extn_id', 'extn_value') 76 _oid_specs = { 77 'service_locator': ServiceLocator, 78 } 79 80 81class RequestExtensions(SequenceOf): 82 _child_spec = RequestExtension 83 84 85class Request(Sequence): 86 _fields = [ 87 ('req_cert', CertId), 88 ('single_request_extensions', RequestExtensions, {'explicit': 0, 'optional': True}), 89 ] 90 91 _processed_extensions = False 92 _critical_extensions = None 93 _service_locator_value = None 94 95 def _set_extensions(self): 96 """ 97 Sets common named extensions to private attributes and creates a list 98 of critical extensions 99 """ 100 101 self._critical_extensions = set() 102 103 for extension in self['single_request_extensions']: 104 name = extension['extn_id'].native 105 attribute_name = '_%s_value' % name 106 if hasattr(self, attribute_name): 107 setattr(self, attribute_name, extension['extn_value'].parsed) 108 if extension['critical'].native: 109 self._critical_extensions.add(name) 110 111 self._processed_extensions = True 112 113 @property 114 def critical_extensions(self): 115 """ 116 Returns a set of the names (or OID if not a known extension) of the 117 extensions marked as critical 118 119 :return: 120 A set of unicode strings 121 """ 122 123 if not self._processed_extensions: 124 self._set_extensions() 125 return self._critical_extensions 126 127 @property 128 def service_locator_value(self): 129 """ 130 This extension is used when communicating with an OCSP responder that 131 acts as a proxy for OCSP requests 132 133 :return: 134 None or a ServiceLocator object 135 """ 136 137 if self._processed_extensions is False: 138 self._set_extensions() 139 return self._service_locator_value 140 141 142class Requests(SequenceOf): 143 _child_spec = Request 144 145 146class ResponseType(ObjectIdentifier): 147 _map = { 148 '1.3.6.1.5.5.7.48.1.1': 'basic_ocsp_response', 149 } 150 151 152class AcceptableResponses(SequenceOf): 153 _child_spec = ResponseType 154 155 156class PreferredSignatureAlgorithm(Sequence): 157 _fields = [ 158 ('sig_identifier', SignedDigestAlgorithm), 159 ('cert_identifier', PublicKeyAlgorithm, {'optional': True}), 160 ] 161 162 163class PreferredSignatureAlgorithms(SequenceOf): 164 _child_spec = PreferredSignatureAlgorithm 165 166 167class TBSRequestExtensionId(ObjectIdentifier): 168 _map = { 169 '1.3.6.1.5.5.7.48.1.2': 'nonce', 170 '1.3.6.1.5.5.7.48.1.4': 'acceptable_responses', 171 '1.3.6.1.5.5.7.48.1.8': 'preferred_signature_algorithms', 172 } 173 174 175class TBSRequestExtension(Sequence): 176 _fields = [ 177 ('extn_id', TBSRequestExtensionId), 178 ('critical', Boolean, {'default': False}), 179 ('extn_value', ParsableOctetString), 180 ] 181 182 _oid_pair = ('extn_id', 'extn_value') 183 _oid_specs = { 184 'nonce': OctetString, 185 'acceptable_responses': AcceptableResponses, 186 'preferred_signature_algorithms': PreferredSignatureAlgorithms, 187 } 188 189 190class TBSRequestExtensions(SequenceOf): 191 _child_spec = TBSRequestExtension 192 193 194class TBSRequest(Sequence): 195 _fields = [ 196 ('version', Version, {'explicit': 0, 'default': 'v1'}), 197 ('requestor_name', GeneralName, {'explicit': 1, 'optional': True}), 198 ('request_list', Requests), 199 ('request_extensions', TBSRequestExtensions, {'explicit': 2, 'optional': True}), 200 ] 201 202 203class Certificates(SequenceOf): 204 _child_spec = Certificate 205 206 207class Signature(Sequence): 208 _fields = [ 209 ('signature_algorithm', SignedDigestAlgorithm), 210 ('signature', OctetBitString), 211 ('certs', Certificates, {'explicit': 0, 'optional': True}), 212 ] 213 214 215class OCSPRequest(Sequence): 216 _fields = [ 217 ('tbs_request', TBSRequest), 218 ('optional_signature', Signature, {'explicit': 0, 'optional': True}), 219 ] 220 221 _processed_extensions = False 222 _critical_extensions = None 223 _nonce_value = None 224 _acceptable_responses_value = None 225 _preferred_signature_algorithms_value = None 226 227 def _set_extensions(self): 228 """ 229 Sets common named extensions to private attributes and creates a list 230 of critical extensions 231 """ 232 233 self._critical_extensions = set() 234 235 for extension in self['tbs_request']['request_extensions']: 236 name = extension['extn_id'].native 237 attribute_name = '_%s_value' % name 238 if hasattr(self, attribute_name): 239 setattr(self, attribute_name, extension['extn_value'].parsed) 240 if extension['critical'].native: 241 self._critical_extensions.add(name) 242 243 self._processed_extensions = True 244 245 @property 246 def critical_extensions(self): 247 """ 248 Returns a set of the names (or OID if not a known extension) of the 249 extensions marked as critical 250 251 :return: 252 A set of unicode strings 253 """ 254 255 if not self._processed_extensions: 256 self._set_extensions() 257 return self._critical_extensions 258 259 @property 260 def nonce_value(self): 261 """ 262 This extension is used to prevent replay attacks by including a unique, 263 random value with each request/response pair 264 265 :return: 266 None or an OctetString object 267 """ 268 269 if self._processed_extensions is False: 270 self._set_extensions() 271 return self._nonce_value 272 273 @property 274 def acceptable_responses_value(self): 275 """ 276 This extension is used to allow the client and server to communicate 277 with alternative response formats other than just basic_ocsp_response, 278 although no other formats are defined in the standard. 279 280 :return: 281 None or an AcceptableResponses object 282 """ 283 284 if self._processed_extensions is False: 285 self._set_extensions() 286 return self._acceptable_responses_value 287 288 @property 289 def preferred_signature_algorithms_value(self): 290 """ 291 This extension is used by the client to define what signature algorithms 292 are preferred, including both the hash algorithm and the public key 293 algorithm, with a level of detail down to even the public key algorithm 294 parameters, such as curve name. 295 296 :return: 297 None or a PreferredSignatureAlgorithms object 298 """ 299 300 if self._processed_extensions is False: 301 self._set_extensions() 302 return self._preferred_signature_algorithms_value 303 304 305class OCSPResponseStatus(Enumerated): 306 _map = { 307 0: 'successful', 308 1: 'malformed_request', 309 2: 'internal_error', 310 3: 'try_later', 311 5: 'sign_required', 312 6: 'unauthorized', 313 } 314 315 316class ResponderId(Choice): 317 _alternatives = [ 318 ('by_name', Name, {'explicit': 1}), 319 ('by_key', OctetString, {'explicit': 2}), 320 ] 321 322 323# Custom class to return a meaningful .native attribute from CertStatus() 324class StatusGood(Null): 325 def set(self, value): 326 """ 327 Sets the value of the object 328 329 :param value: 330 None or 'good' 331 """ 332 333 if value is not None and value != 'good' and not isinstance(value, Null): 334 raise ValueError(unwrap( 335 ''' 336 value must be one of None, "good", not %s 337 ''', 338 repr(value) 339 )) 340 341 self.contents = b'' 342 343 @property 344 def native(self): 345 return 'good' 346 347 348# Custom class to return a meaningful .native attribute from CertStatus() 349class StatusUnknown(Null): 350 def set(self, value): 351 """ 352 Sets the value of the object 353 354 :param value: 355 None or 'unknown' 356 """ 357 358 if value is not None and value != 'unknown' and not isinstance(value, Null): 359 raise ValueError(unwrap( 360 ''' 361 value must be one of None, "unknown", not %s 362 ''', 363 repr(value) 364 )) 365 366 self.contents = b'' 367 368 @property 369 def native(self): 370 return 'unknown' 371 372 373class RevokedInfo(Sequence): 374 _fields = [ 375 ('revocation_time', GeneralizedTime), 376 ('revocation_reason', CRLReason, {'explicit': 0, 'optional': True}), 377 ] 378 379 380class CertStatus(Choice): 381 _alternatives = [ 382 ('good', StatusGood, {'implicit': 0}), 383 ('revoked', RevokedInfo, {'implicit': 1}), 384 ('unknown', StatusUnknown, {'implicit': 2}), 385 ] 386 387 388class CrlId(Sequence): 389 _fields = [ 390 ('crl_url', IA5String, {'explicit': 0, 'optional': True}), 391 ('crl_num', Integer, {'explicit': 1, 'optional': True}), 392 ('crl_time', GeneralizedTime, {'explicit': 2, 'optional': True}), 393 ] 394 395 396class SingleResponseExtensionId(ObjectIdentifier): 397 _map = { 398 '1.3.6.1.5.5.7.48.1.3': 'crl', 399 '1.3.6.1.5.5.7.48.1.6': 'archive_cutoff', 400 # These are CRLEntryExtension values from 401 # https://tools.ietf.org/html/rfc5280 402 '2.5.29.21': 'crl_reason', 403 '2.5.29.24': 'invalidity_date', 404 '2.5.29.29': 'certificate_issuer', 405 # https://tools.ietf.org/html/rfc6962.html#page-13 406 '1.3.6.1.4.1.11129.2.4.5': 'signed_certificate_timestamp_list', 407 } 408 409 410class SingleResponseExtension(Sequence): 411 _fields = [ 412 ('extn_id', SingleResponseExtensionId), 413 ('critical', Boolean, {'default': False}), 414 ('extn_value', ParsableOctetString), 415 ] 416 417 _oid_pair = ('extn_id', 'extn_value') 418 _oid_specs = { 419 'crl': CrlId, 420 'archive_cutoff': GeneralizedTime, 421 'crl_reason': CRLReason, 422 'invalidity_date': GeneralizedTime, 423 'certificate_issuer': GeneralNames, 424 'signed_certificate_timestamp_list': OctetString, 425 } 426 427 428class SingleResponseExtensions(SequenceOf): 429 _child_spec = SingleResponseExtension 430 431 432class SingleResponse(Sequence): 433 _fields = [ 434 ('cert_id', CertId), 435 ('cert_status', CertStatus), 436 ('this_update', GeneralizedTime), 437 ('next_update', GeneralizedTime, {'explicit': 0, 'optional': True}), 438 ('single_extensions', SingleResponseExtensions, {'explicit': 1, 'optional': True}), 439 ] 440 441 _processed_extensions = False 442 _critical_extensions = None 443 _crl_value = None 444 _archive_cutoff_value = None 445 _crl_reason_value = None 446 _invalidity_date_value = None 447 _certificate_issuer_value = None 448 449 def _set_extensions(self): 450 """ 451 Sets common named extensions to private attributes and creates a list 452 of critical extensions 453 """ 454 455 self._critical_extensions = set() 456 457 for extension in self['single_extensions']: 458 name = extension['extn_id'].native 459 attribute_name = '_%s_value' % name 460 if hasattr(self, attribute_name): 461 setattr(self, attribute_name, extension['extn_value'].parsed) 462 if extension['critical'].native: 463 self._critical_extensions.add(name) 464 465 self._processed_extensions = True 466 467 @property 468 def critical_extensions(self): 469 """ 470 Returns a set of the names (or OID if not a known extension) of the 471 extensions marked as critical 472 473 :return: 474 A set of unicode strings 475 """ 476 477 if not self._processed_extensions: 478 self._set_extensions() 479 return self._critical_extensions 480 481 @property 482 def crl_value(self): 483 """ 484 This extension is used to locate the CRL that a certificate's revocation 485 is contained within. 486 487 :return: 488 None or a CrlId object 489 """ 490 491 if self._processed_extensions is False: 492 self._set_extensions() 493 return self._crl_value 494 495 @property 496 def archive_cutoff_value(self): 497 """ 498 This extension is used to indicate the date at which an archived 499 (historical) certificate status entry will no longer be available. 500 501 :return: 502 None or a GeneralizedTime object 503 """ 504 505 if self._processed_extensions is False: 506 self._set_extensions() 507 return self._archive_cutoff_value 508 509 @property 510 def crl_reason_value(self): 511 """ 512 This extension indicates the reason that a certificate was revoked. 513 514 :return: 515 None or a CRLReason object 516 """ 517 518 if self._processed_extensions is False: 519 self._set_extensions() 520 return self._crl_reason_value 521 522 @property 523 def invalidity_date_value(self): 524 """ 525 This extension indicates the suspected date/time the private key was 526 compromised or the certificate became invalid. This would usually be 527 before the revocation date, which is when the CA processed the 528 revocation. 529 530 :return: 531 None or a GeneralizedTime object 532 """ 533 534 if self._processed_extensions is False: 535 self._set_extensions() 536 return self._invalidity_date_value 537 538 @property 539 def certificate_issuer_value(self): 540 """ 541 This extension indicates the issuer of the certificate in question. 542 543 :return: 544 None or an x509.GeneralNames object 545 """ 546 547 if self._processed_extensions is False: 548 self._set_extensions() 549 return self._certificate_issuer_value 550 551 552class Responses(SequenceOf): 553 _child_spec = SingleResponse 554 555 556class ResponseDataExtensionId(ObjectIdentifier): 557 _map = { 558 '1.3.6.1.5.5.7.48.1.2': 'nonce', 559 '1.3.6.1.5.5.7.48.1.9': 'extended_revoke', 560 } 561 562 563class ResponseDataExtension(Sequence): 564 _fields = [ 565 ('extn_id', ResponseDataExtensionId), 566 ('critical', Boolean, {'default': False}), 567 ('extn_value', ParsableOctetString), 568 ] 569 570 _oid_pair = ('extn_id', 'extn_value') 571 _oid_specs = { 572 'nonce': OctetString, 573 'extended_revoke': Null, 574 } 575 576 577class ResponseDataExtensions(SequenceOf): 578 _child_spec = ResponseDataExtension 579 580 581class ResponseData(Sequence): 582 _fields = [ 583 ('version', Version, {'explicit': 0, 'default': 'v1'}), 584 ('responder_id', ResponderId), 585 ('produced_at', GeneralizedTime), 586 ('responses', Responses), 587 ('response_extensions', ResponseDataExtensions, {'explicit': 1, 'optional': True}), 588 ] 589 590 591class BasicOCSPResponse(Sequence): 592 _fields = [ 593 ('tbs_response_data', ResponseData), 594 ('signature_algorithm', SignedDigestAlgorithm), 595 ('signature', OctetBitString), 596 ('certs', Certificates, {'explicit': 0, 'optional': True}), 597 ] 598 599 600class ResponseBytes(Sequence): 601 _fields = [ 602 ('response_type', ResponseType), 603 ('response', ParsableOctetString), 604 ] 605 606 _oid_pair = ('response_type', 'response') 607 _oid_specs = { 608 'basic_ocsp_response': BasicOCSPResponse, 609 } 610 611 612class OCSPResponse(Sequence): 613 _fields = [ 614 ('response_status', OCSPResponseStatus), 615 ('response_bytes', ResponseBytes, {'explicit': 0, 'optional': True}), 616 ] 617 618 _processed_extensions = False 619 _critical_extensions = None 620 _nonce_value = None 621 _extended_revoke_value = None 622 623 def _set_extensions(self): 624 """ 625 Sets common named extensions to private attributes and creates a list 626 of critical extensions 627 """ 628 629 self._critical_extensions = set() 630 631 for extension in self['response_bytes']['response'].parsed['tbs_response_data']['response_extensions']: 632 name = extension['extn_id'].native 633 attribute_name = '_%s_value' % name 634 if hasattr(self, attribute_name): 635 setattr(self, attribute_name, extension['extn_value'].parsed) 636 if extension['critical'].native: 637 self._critical_extensions.add(name) 638 639 self._processed_extensions = True 640 641 @property 642 def critical_extensions(self): 643 """ 644 Returns a set of the names (or OID if not a known extension) of the 645 extensions marked as critical 646 647 :return: 648 A set of unicode strings 649 """ 650 651 if not self._processed_extensions: 652 self._set_extensions() 653 return self._critical_extensions 654 655 @property 656 def nonce_value(self): 657 """ 658 This extension is used to prevent replay attacks on the request/response 659 exchange 660 661 :return: 662 None or an OctetString object 663 """ 664 665 if self._processed_extensions is False: 666 self._set_extensions() 667 return self._nonce_value 668 669 @property 670 def extended_revoke_value(self): 671 """ 672 This extension is used to signal that the responder will return a 673 "revoked" status for non-issued certificates. 674 675 :return: 676 None or a Null object (if present) 677 """ 678 679 if self._processed_extensions is False: 680 self._set_extensions() 681 return self._extended_revoke_value 682 683 @property 684 def basic_ocsp_response(self): 685 """ 686 A shortcut into the BasicOCSPResponse sequence 687 688 :return: 689 None or an asn1crypto.ocsp.BasicOCSPResponse object 690 """ 691 692 return self['response_bytes']['response'].parsed 693 694 @property 695 def response_data(self): 696 """ 697 A shortcut into the parsed, ResponseData sequence 698 699 :return: 700 None or an asn1crypto.ocsp.ResponseData object 701 """ 702 703 return self['response_bytes']['response'].parsed['tbs_response_data'] 704