1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7import abc 8import datetime 9from enum import Enum 10 11import six 12 13from cryptography import x509 14from cryptography.hazmat.primitives import hashes 15from cryptography.x509.base import ( 16 _EARLIEST_UTC_TIME, 17 _convert_to_naive_utc_time, 18 _reject_duplicate_extension, 19) 20 21 22_OIDS_TO_HASH = { 23 "1.3.14.3.2.26": hashes.SHA1(), 24 "2.16.840.1.101.3.4.2.4": hashes.SHA224(), 25 "2.16.840.1.101.3.4.2.1": hashes.SHA256(), 26 "2.16.840.1.101.3.4.2.2": hashes.SHA384(), 27 "2.16.840.1.101.3.4.2.3": hashes.SHA512(), 28} 29 30 31class OCSPResponderEncoding(Enum): 32 HASH = "By Hash" 33 NAME = "By Name" 34 35 36class OCSPResponseStatus(Enum): 37 SUCCESSFUL = 0 38 MALFORMED_REQUEST = 1 39 INTERNAL_ERROR = 2 40 TRY_LATER = 3 41 SIG_REQUIRED = 5 42 UNAUTHORIZED = 6 43 44 45_RESPONSE_STATUS_TO_ENUM = {x.value: x for x in OCSPResponseStatus} 46_ALLOWED_HASHES = ( 47 hashes.SHA1, 48 hashes.SHA224, 49 hashes.SHA256, 50 hashes.SHA384, 51 hashes.SHA512, 52) 53 54 55def _verify_algorithm(algorithm): 56 if not isinstance(algorithm, _ALLOWED_HASHES): 57 raise ValueError( 58 "Algorithm must be SHA1, SHA224, SHA256, SHA384, or SHA512" 59 ) 60 61 62class OCSPCertStatus(Enum): 63 GOOD = 0 64 REVOKED = 1 65 UNKNOWN = 2 66 67 68_CERT_STATUS_TO_ENUM = {x.value: x for x in OCSPCertStatus} 69 70 71def load_der_ocsp_request(data): 72 from cryptography.hazmat.backends.openssl.backend import backend 73 74 return backend.load_der_ocsp_request(data) 75 76 77def load_der_ocsp_response(data): 78 from cryptography.hazmat.backends.openssl.backend import backend 79 80 return backend.load_der_ocsp_response(data) 81 82 83class OCSPRequestBuilder(object): 84 def __init__(self, request=None, extensions=[]): 85 self._request = request 86 self._extensions = extensions 87 88 def add_certificate(self, cert, issuer, algorithm): 89 if self._request is not None: 90 raise ValueError("Only one certificate can be added to a request") 91 92 _verify_algorithm(algorithm) 93 if not isinstance(cert, x509.Certificate) or not isinstance( 94 issuer, x509.Certificate 95 ): 96 raise TypeError("cert and issuer must be a Certificate") 97 98 return OCSPRequestBuilder((cert, issuer, algorithm), self._extensions) 99 100 def add_extension(self, extension, critical): 101 if not isinstance(extension, x509.ExtensionType): 102 raise TypeError("extension must be an ExtensionType") 103 104 extension = x509.Extension(extension.oid, critical, extension) 105 _reject_duplicate_extension(extension, self._extensions) 106 107 return OCSPRequestBuilder( 108 self._request, self._extensions + [extension] 109 ) 110 111 def build(self): 112 from cryptography.hazmat.backends.openssl.backend import backend 113 114 if self._request is None: 115 raise ValueError("You must add a certificate before building") 116 117 return backend.create_ocsp_request(self) 118 119 120class _SingleResponse(object): 121 def __init__( 122 self, 123 cert, 124 issuer, 125 algorithm, 126 cert_status, 127 this_update, 128 next_update, 129 revocation_time, 130 revocation_reason, 131 ): 132 if not isinstance(cert, x509.Certificate) or not isinstance( 133 issuer, x509.Certificate 134 ): 135 raise TypeError("cert and issuer must be a Certificate") 136 137 _verify_algorithm(algorithm) 138 if not isinstance(this_update, datetime.datetime): 139 raise TypeError("this_update must be a datetime object") 140 if next_update is not None and not isinstance( 141 next_update, datetime.datetime 142 ): 143 raise TypeError("next_update must be a datetime object or None") 144 145 self._cert = cert 146 self._issuer = issuer 147 self._algorithm = algorithm 148 self._this_update = this_update 149 self._next_update = next_update 150 151 if not isinstance(cert_status, OCSPCertStatus): 152 raise TypeError( 153 "cert_status must be an item from the OCSPCertStatus enum" 154 ) 155 if cert_status is not OCSPCertStatus.REVOKED: 156 if revocation_time is not None: 157 raise ValueError( 158 "revocation_time can only be provided if the certificate " 159 "is revoked" 160 ) 161 if revocation_reason is not None: 162 raise ValueError( 163 "revocation_reason can only be provided if the certificate" 164 " is revoked" 165 ) 166 else: 167 if not isinstance(revocation_time, datetime.datetime): 168 raise TypeError("revocation_time must be a datetime object") 169 170 revocation_time = _convert_to_naive_utc_time(revocation_time) 171 if revocation_time < _EARLIEST_UTC_TIME: 172 raise ValueError( 173 "The revocation_time must be on or after" 174 " 1950 January 1." 175 ) 176 177 if revocation_reason is not None and not isinstance( 178 revocation_reason, x509.ReasonFlags 179 ): 180 raise TypeError( 181 "revocation_reason must be an item from the ReasonFlags " 182 "enum or None" 183 ) 184 185 self._cert_status = cert_status 186 self._revocation_time = revocation_time 187 self._revocation_reason = revocation_reason 188 189 190class OCSPResponseBuilder(object): 191 def __init__( 192 self, response=None, responder_id=None, certs=None, extensions=[] 193 ): 194 self._response = response 195 self._responder_id = responder_id 196 self._certs = certs 197 self._extensions = extensions 198 199 def add_response( 200 self, 201 cert, 202 issuer, 203 algorithm, 204 cert_status, 205 this_update, 206 next_update, 207 revocation_time, 208 revocation_reason, 209 ): 210 if self._response is not None: 211 raise ValueError("Only one response per OCSPResponse.") 212 213 singleresp = _SingleResponse( 214 cert, 215 issuer, 216 algorithm, 217 cert_status, 218 this_update, 219 next_update, 220 revocation_time, 221 revocation_reason, 222 ) 223 return OCSPResponseBuilder( 224 singleresp, 225 self._responder_id, 226 self._certs, 227 self._extensions, 228 ) 229 230 def responder_id(self, encoding, responder_cert): 231 if self._responder_id is not None: 232 raise ValueError("responder_id can only be set once") 233 if not isinstance(responder_cert, x509.Certificate): 234 raise TypeError("responder_cert must be a Certificate") 235 if not isinstance(encoding, OCSPResponderEncoding): 236 raise TypeError( 237 "encoding must be an element from OCSPResponderEncoding" 238 ) 239 240 return OCSPResponseBuilder( 241 self._response, 242 (responder_cert, encoding), 243 self._certs, 244 self._extensions, 245 ) 246 247 def certificates(self, certs): 248 if self._certs is not None: 249 raise ValueError("certificates may only be set once") 250 certs = list(certs) 251 if len(certs) == 0: 252 raise ValueError("certs must not be an empty list") 253 if not all(isinstance(x, x509.Certificate) for x in certs): 254 raise TypeError("certs must be a list of Certificates") 255 return OCSPResponseBuilder( 256 self._response, 257 self._responder_id, 258 certs, 259 self._extensions, 260 ) 261 262 def add_extension(self, extension, critical): 263 if not isinstance(extension, x509.ExtensionType): 264 raise TypeError("extension must be an ExtensionType") 265 266 extension = x509.Extension(extension.oid, critical, extension) 267 _reject_duplicate_extension(extension, self._extensions) 268 269 return OCSPResponseBuilder( 270 self._response, 271 self._responder_id, 272 self._certs, 273 self._extensions + [extension], 274 ) 275 276 def sign(self, private_key, algorithm): 277 from cryptography.hazmat.backends.openssl.backend import backend 278 279 if self._response is None: 280 raise ValueError("You must add a response before signing") 281 if self._responder_id is None: 282 raise ValueError("You must add a responder_id before signing") 283 284 return backend.create_ocsp_response( 285 OCSPResponseStatus.SUCCESSFUL, self, private_key, algorithm 286 ) 287 288 @classmethod 289 def build_unsuccessful(cls, response_status): 290 from cryptography.hazmat.backends.openssl.backend import backend 291 292 if not isinstance(response_status, OCSPResponseStatus): 293 raise TypeError( 294 "response_status must be an item from OCSPResponseStatus" 295 ) 296 if response_status is OCSPResponseStatus.SUCCESSFUL: 297 raise ValueError("response_status cannot be SUCCESSFUL") 298 299 return backend.create_ocsp_response(response_status, None, None, None) 300 301 302@six.add_metaclass(abc.ABCMeta) 303class OCSPRequest(object): 304 @abc.abstractproperty 305 def issuer_key_hash(self): 306 """ 307 The hash of the issuer public key 308 """ 309 310 @abc.abstractproperty 311 def issuer_name_hash(self): 312 """ 313 The hash of the issuer name 314 """ 315 316 @abc.abstractproperty 317 def hash_algorithm(self): 318 """ 319 The hash algorithm used in the issuer name and key hashes 320 """ 321 322 @abc.abstractproperty 323 def serial_number(self): 324 """ 325 The serial number of the cert whose status is being checked 326 """ 327 328 @abc.abstractmethod 329 def public_bytes(self, encoding): 330 """ 331 Serializes the request to DER 332 """ 333 334 @abc.abstractproperty 335 def extensions(self): 336 """ 337 The list of request extensions. Not single request extensions. 338 """ 339 340 341@six.add_metaclass(abc.ABCMeta) 342class OCSPResponse(object): 343 @abc.abstractproperty 344 def response_status(self): 345 """ 346 The status of the response. This is a value from the OCSPResponseStatus 347 enumeration 348 """ 349 350 @abc.abstractproperty 351 def signature_algorithm_oid(self): 352 """ 353 The ObjectIdentifier of the signature algorithm 354 """ 355 356 @abc.abstractproperty 357 def signature_hash_algorithm(self): 358 """ 359 Returns a HashAlgorithm corresponding to the type of the digest signed 360 """ 361 362 @abc.abstractproperty 363 def signature(self): 364 """ 365 The signature bytes 366 """ 367 368 @abc.abstractproperty 369 def tbs_response_bytes(self): 370 """ 371 The tbsResponseData bytes 372 """ 373 374 @abc.abstractproperty 375 def certificates(self): 376 """ 377 A list of certificates used to help build a chain to verify the OCSP 378 response. This situation occurs when the OCSP responder uses a delegate 379 certificate. 380 """ 381 382 @abc.abstractproperty 383 def responder_key_hash(self): 384 """ 385 The responder's key hash or None 386 """ 387 388 @abc.abstractproperty 389 def responder_name(self): 390 """ 391 The responder's Name or None 392 """ 393 394 @abc.abstractproperty 395 def produced_at(self): 396 """ 397 The time the response was produced 398 """ 399 400 @abc.abstractproperty 401 def certificate_status(self): 402 """ 403 The status of the certificate (an element from the OCSPCertStatus enum) 404 """ 405 406 @abc.abstractproperty 407 def revocation_time(self): 408 """ 409 The date of when the certificate was revoked or None if not 410 revoked. 411 """ 412 413 @abc.abstractproperty 414 def revocation_reason(self): 415 """ 416 The reason the certificate was revoked or None if not specified or 417 not revoked. 418 """ 419 420 @abc.abstractproperty 421 def this_update(self): 422 """ 423 The most recent time at which the status being indicated is known by 424 the responder to have been correct 425 """ 426 427 @abc.abstractproperty 428 def next_update(self): 429 """ 430 The time when newer information will be available 431 """ 432 433 @abc.abstractproperty 434 def issuer_key_hash(self): 435 """ 436 The hash of the issuer public key 437 """ 438 439 @abc.abstractproperty 440 def issuer_name_hash(self): 441 """ 442 The hash of the issuer name 443 """ 444 445 @abc.abstractproperty 446 def hash_algorithm(self): 447 """ 448 The hash algorithm used in the issuer name and key hashes 449 """ 450 451 @abc.abstractproperty 452 def serial_number(self): 453 """ 454 The serial number of the cert whose status is being checked 455 """ 456 457 @abc.abstractproperty 458 def extensions(self): 459 """ 460 The list of response extensions. Not single response extensions. 461 """ 462 463 @abc.abstractproperty 464 def single_extensions(self): 465 """ 466 The list of single response extensions. Not response extensions. 467 """ 468