1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7import abc 8import datetime 9from enum import Enum 10 11import six 12 13from cryptography import x509 14from cryptography.hazmat.primitives import hashes 15from cryptography.x509.base import ( 16 _EARLIEST_UTC_TIME, _convert_to_naive_utc_time, _reject_duplicate_extension 17) 18 19 20_OIDS_TO_HASH = { 21 "1.3.14.3.2.26": hashes.SHA1(), 22 "2.16.840.1.101.3.4.2.4": hashes.SHA224(), 23 "2.16.840.1.101.3.4.2.1": hashes.SHA256(), 24 "2.16.840.1.101.3.4.2.2": hashes.SHA384(), 25 "2.16.840.1.101.3.4.2.3": hashes.SHA512(), 26} 27 28 29class OCSPResponderEncoding(Enum): 30 HASH = "By Hash" 31 NAME = "By Name" 32 33 34class OCSPResponseStatus(Enum): 35 SUCCESSFUL = 0 36 MALFORMED_REQUEST = 1 37 INTERNAL_ERROR = 2 38 TRY_LATER = 3 39 SIG_REQUIRED = 5 40 UNAUTHORIZED = 6 41 42 43_RESPONSE_STATUS_TO_ENUM = dict((x.value, x) for x in OCSPResponseStatus) 44_ALLOWED_HASHES = ( 45 hashes.SHA1, hashes.SHA224, hashes.SHA256, 46 hashes.SHA384, hashes.SHA512 47) 48 49 50def _verify_algorithm(algorithm): 51 if not isinstance(algorithm, _ALLOWED_HASHES): 52 raise ValueError( 53 "Algorithm must be SHA1, SHA224, SHA256, SHA384, or SHA512" 54 ) 55 56 57class OCSPCertStatus(Enum): 58 GOOD = 0 59 REVOKED = 1 60 UNKNOWN = 2 61 62 63_CERT_STATUS_TO_ENUM = dict((x.value, x) for x in OCSPCertStatus) 64 65 66def load_der_ocsp_request(data): 67 from cryptography.hazmat.backends.openssl.backend import backend 68 return backend.load_der_ocsp_request(data) 69 70 71def load_der_ocsp_response(data): 72 from cryptography.hazmat.backends.openssl.backend import backend 73 return backend.load_der_ocsp_response(data) 74 75 76class OCSPRequestBuilder(object): 77 def __init__(self, request=None, extensions=[]): 78 self._request = request 79 self._extensions = extensions 80 81 def add_certificate(self, cert, issuer, algorithm): 82 if self._request is not None: 83 raise ValueError("Only one certificate can be added to a request") 84 85 _verify_algorithm(algorithm) 86 if ( 87 not isinstance(cert, x509.Certificate) or 88 not isinstance(issuer, x509.Certificate) 89 ): 90 raise TypeError("cert and issuer must be a Certificate") 91 92 return OCSPRequestBuilder((cert, issuer, algorithm), self._extensions) 93 94 def add_extension(self, extension, critical): 95 if not isinstance(extension, x509.ExtensionType): 96 raise TypeError("extension must be an ExtensionType") 97 98 extension = x509.Extension(extension.oid, critical, extension) 99 _reject_duplicate_extension(extension, self._extensions) 100 101 return OCSPRequestBuilder( 102 self._request, self._extensions + [extension] 103 ) 104 105 def build(self): 106 from cryptography.hazmat.backends.openssl.backend import backend 107 if self._request is None: 108 raise ValueError("You must add a certificate before building") 109 110 return backend.create_ocsp_request(self) 111 112 113class _SingleResponse(object): 114 def __init__(self, cert, issuer, algorithm, cert_status, this_update, 115 next_update, revocation_time, revocation_reason): 116 if ( 117 not isinstance(cert, x509.Certificate) or 118 not isinstance(issuer, x509.Certificate) 119 ): 120 raise TypeError("cert and issuer must be a Certificate") 121 122 _verify_algorithm(algorithm) 123 if not isinstance(this_update, datetime.datetime): 124 raise TypeError("this_update must be a datetime object") 125 if ( 126 next_update is not None and 127 not isinstance(next_update, datetime.datetime) 128 ): 129 raise TypeError("next_update must be a datetime object or None") 130 131 self._cert = cert 132 self._issuer = issuer 133 self._algorithm = algorithm 134 self._this_update = this_update 135 self._next_update = next_update 136 137 if not isinstance(cert_status, OCSPCertStatus): 138 raise TypeError( 139 "cert_status must be an item from the OCSPCertStatus enum" 140 ) 141 if cert_status is not OCSPCertStatus.REVOKED: 142 if revocation_time is not None: 143 raise ValueError( 144 "revocation_time can only be provided if the certificate " 145 "is revoked" 146 ) 147 if revocation_reason is not None: 148 raise ValueError( 149 "revocation_reason can only be provided if the certificate" 150 " is revoked" 151 ) 152 else: 153 if not isinstance(revocation_time, datetime.datetime): 154 raise TypeError("revocation_time must be a datetime object") 155 156 revocation_time = _convert_to_naive_utc_time(revocation_time) 157 if revocation_time < _EARLIEST_UTC_TIME: 158 raise ValueError('The revocation_time must be on or after' 159 ' 1950 January 1.') 160 161 if ( 162 revocation_reason is not None and 163 not isinstance(revocation_reason, x509.ReasonFlags) 164 ): 165 raise TypeError( 166 "revocation_reason must be an item from the ReasonFlags " 167 "enum or None" 168 ) 169 170 self._cert_status = cert_status 171 self._revocation_time = revocation_time 172 self._revocation_reason = revocation_reason 173 174 175class OCSPResponseBuilder(object): 176 def __init__(self, response=None, responder_id=None, certs=None, 177 extensions=[]): 178 self._response = response 179 self._responder_id = responder_id 180 self._certs = certs 181 self._extensions = extensions 182 183 def add_response(self, cert, issuer, algorithm, cert_status, this_update, 184 next_update, revocation_time, revocation_reason): 185 if self._response is not None: 186 raise ValueError("Only one response per OCSPResponse.") 187 188 singleresp = _SingleResponse( 189 cert, issuer, algorithm, cert_status, this_update, next_update, 190 revocation_time, revocation_reason 191 ) 192 return OCSPResponseBuilder( 193 singleresp, self._responder_id, 194 self._certs, self._extensions, 195 ) 196 197 def responder_id(self, encoding, responder_cert): 198 if self._responder_id is not None: 199 raise ValueError("responder_id can only be set once") 200 if not isinstance(responder_cert, x509.Certificate): 201 raise TypeError("responder_cert must be a Certificate") 202 if not isinstance(encoding, OCSPResponderEncoding): 203 raise TypeError( 204 "encoding must be an element from OCSPResponderEncoding" 205 ) 206 207 return OCSPResponseBuilder( 208 self._response, (responder_cert, encoding), 209 self._certs, self._extensions, 210 ) 211 212 def certificates(self, certs): 213 if self._certs is not None: 214 raise ValueError("certificates may only be set once") 215 certs = list(certs) 216 if len(certs) == 0: 217 raise ValueError("certs must not be an empty list") 218 if not all(isinstance(x, x509.Certificate) for x in certs): 219 raise TypeError("certs must be a list of Certificates") 220 return OCSPResponseBuilder( 221 self._response, self._responder_id, 222 certs, self._extensions, 223 ) 224 225 def add_extension(self, extension, critical): 226 if not isinstance(extension, x509.ExtensionType): 227 raise TypeError("extension must be an ExtensionType") 228 229 extension = x509.Extension(extension.oid, critical, extension) 230 _reject_duplicate_extension(extension, self._extensions) 231 232 return OCSPResponseBuilder( 233 self._response, self._responder_id, 234 self._certs, self._extensions + [extension], 235 ) 236 237 def sign(self, private_key, algorithm): 238 from cryptography.hazmat.backends.openssl.backend import backend 239 if self._response is None: 240 raise ValueError("You must add a response before signing") 241 if self._responder_id is None: 242 raise ValueError("You must add a responder_id before signing") 243 244 if not isinstance(algorithm, hashes.HashAlgorithm): 245 raise TypeError("Algorithm must be a registered hash algorithm.") 246 247 return backend.create_ocsp_response( 248 OCSPResponseStatus.SUCCESSFUL, self, private_key, algorithm 249 ) 250 251 @classmethod 252 def build_unsuccessful(cls, response_status): 253 from cryptography.hazmat.backends.openssl.backend import backend 254 if not isinstance(response_status, OCSPResponseStatus): 255 raise TypeError( 256 "response_status must be an item from OCSPResponseStatus" 257 ) 258 if response_status is OCSPResponseStatus.SUCCESSFUL: 259 raise ValueError("response_status cannot be SUCCESSFUL") 260 261 return backend.create_ocsp_response(response_status, None, None, None) 262 263 264@six.add_metaclass(abc.ABCMeta) 265class OCSPRequest(object): 266 @abc.abstractproperty 267 def issuer_key_hash(self): 268 """ 269 The hash of the issuer public key 270 """ 271 272 @abc.abstractproperty 273 def issuer_name_hash(self): 274 """ 275 The hash of the issuer name 276 """ 277 278 @abc.abstractproperty 279 def hash_algorithm(self): 280 """ 281 The hash algorithm used in the issuer name and key hashes 282 """ 283 284 @abc.abstractproperty 285 def serial_number(self): 286 """ 287 The serial number of the cert whose status is being checked 288 """ 289 @abc.abstractmethod 290 def public_bytes(self, encoding): 291 """ 292 Serializes the request to DER 293 """ 294 295 @abc.abstractproperty 296 def extensions(self): 297 """ 298 The list of request extensions. Not single request extensions. 299 """ 300 301 302@six.add_metaclass(abc.ABCMeta) 303class OCSPResponse(object): 304 @abc.abstractproperty 305 def response_status(self): 306 """ 307 The status of the response. This is a value from the OCSPResponseStatus 308 enumeration 309 """ 310 311 @abc.abstractproperty 312 def signature_algorithm_oid(self): 313 """ 314 The ObjectIdentifier of the signature algorithm 315 """ 316 317 @abc.abstractproperty 318 def signature_hash_algorithm(self): 319 """ 320 Returns a HashAlgorithm corresponding to the type of the digest signed 321 """ 322 323 @abc.abstractproperty 324 def signature(self): 325 """ 326 The signature bytes 327 """ 328 329 @abc.abstractproperty 330 def tbs_response_bytes(self): 331 """ 332 The tbsResponseData bytes 333 """ 334 335 @abc.abstractproperty 336 def certificates(self): 337 """ 338 A list of certificates used to help build a chain to verify the OCSP 339 response. This situation occurs when the OCSP responder uses a delegate 340 certificate. 341 """ 342 343 @abc.abstractproperty 344 def responder_key_hash(self): 345 """ 346 The responder's key hash or None 347 """ 348 349 @abc.abstractproperty 350 def responder_name(self): 351 """ 352 The responder's Name or None 353 """ 354 355 @abc.abstractproperty 356 def produced_at(self): 357 """ 358 The time the response was produced 359 """ 360 361 @abc.abstractproperty 362 def certificate_status(self): 363 """ 364 The status of the certificate (an element from the OCSPCertStatus enum) 365 """ 366 367 @abc.abstractproperty 368 def revocation_time(self): 369 """ 370 The date of when the certificate was revoked or None if not 371 revoked. 372 """ 373 374 @abc.abstractproperty 375 def revocation_reason(self): 376 """ 377 The reason the certificate was revoked or None if not specified or 378 not revoked. 379 """ 380 381 @abc.abstractproperty 382 def this_update(self): 383 """ 384 The most recent time at which the status being indicated is known by 385 the responder to have been correct 386 """ 387 388 @abc.abstractproperty 389 def next_update(self): 390 """ 391 The time when newer information will be available 392 """ 393 394 @abc.abstractproperty 395 def issuer_key_hash(self): 396 """ 397 The hash of the issuer public key 398 """ 399 400 @abc.abstractproperty 401 def issuer_name_hash(self): 402 """ 403 The hash of the issuer name 404 """ 405 406 @abc.abstractproperty 407 def hash_algorithm(self): 408 """ 409 The hash algorithm used in the issuer name and key hashes 410 """ 411 412 @abc.abstractproperty 413 def serial_number(self): 414 """ 415 The serial number of the cert whose status is being checked 416 """ 417 418 @abc.abstractproperty 419 def extensions(self): 420 """ 421 The list of response extensions. Not single response extensions. 422 """ 423