1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7import functools 8 9from cryptography import utils, x509 10from cryptography.exceptions import UnsupportedAlgorithm 11from cryptography.hazmat.backends.openssl.decode_asn1 import ( 12 _CRL_ENTRY_REASON_CODE_TO_ENUM, _OCSP_BASICRESP_EXT_PARSER, 13 _OCSP_REQ_EXT_PARSER, _asn1_integer_to_int, 14 _asn1_string_to_bytes, _decode_x509_name, _obj2txt, 15 _parse_asn1_generalized_time, 16) 17from cryptography.hazmat.backends.openssl.x509 import _Certificate 18from cryptography.hazmat.primitives import serialization 19from cryptography.x509.ocsp import ( 20 OCSPCertStatus, OCSPRequest, OCSPResponse, OCSPResponseStatus, 21 _CERT_STATUS_TO_ENUM, _OIDS_TO_HASH, _RESPONSE_STATUS_TO_ENUM, 22) 23 24 25def _requires_successful_response(func): 26 @functools.wraps(func) 27 def wrapper(self, *args): 28 if self.response_status != OCSPResponseStatus.SUCCESSFUL: 29 raise ValueError( 30 "OCSP response status is not successful so the property " 31 "has no value" 32 ) 33 else: 34 return func(self, *args) 35 36 return wrapper 37 38 39def _issuer_key_hash(backend, cert_id): 40 key_hash = backend._ffi.new("ASN1_OCTET_STRING **") 41 res = backend._lib.OCSP_id_get0_info( 42 backend._ffi.NULL, backend._ffi.NULL, 43 key_hash, backend._ffi.NULL, cert_id 44 ) 45 backend.openssl_assert(res == 1) 46 backend.openssl_assert(key_hash[0] != backend._ffi.NULL) 47 return _asn1_string_to_bytes(backend, key_hash[0]) 48 49 50def _issuer_name_hash(backend, cert_id): 51 name_hash = backend._ffi.new("ASN1_OCTET_STRING **") 52 res = backend._lib.OCSP_id_get0_info( 53 name_hash, backend._ffi.NULL, 54 backend._ffi.NULL, backend._ffi.NULL, cert_id 55 ) 56 backend.openssl_assert(res == 1) 57 backend.openssl_assert(name_hash[0] != backend._ffi.NULL) 58 return _asn1_string_to_bytes(backend, name_hash[0]) 59 60 61def _serial_number(backend, cert_id): 62 num = backend._ffi.new("ASN1_INTEGER **") 63 res = backend._lib.OCSP_id_get0_info( 64 backend._ffi.NULL, backend._ffi.NULL, 65 backend._ffi.NULL, num, cert_id 66 ) 67 backend.openssl_assert(res == 1) 68 backend.openssl_assert(num[0] != backend._ffi.NULL) 69 return _asn1_integer_to_int(backend, num[0]) 70 71 72def _hash_algorithm(backend, cert_id): 73 asn1obj = backend._ffi.new("ASN1_OBJECT **") 74 res = backend._lib.OCSP_id_get0_info( 75 backend._ffi.NULL, asn1obj, 76 backend._ffi.NULL, backend._ffi.NULL, cert_id 77 ) 78 backend.openssl_assert(res == 1) 79 backend.openssl_assert(asn1obj[0] != backend._ffi.NULL) 80 oid = _obj2txt(backend, asn1obj[0]) 81 try: 82 return _OIDS_TO_HASH[oid] 83 except KeyError: 84 raise UnsupportedAlgorithm( 85 "Signature algorithm OID: {0} not recognized".format(oid) 86 ) 87 88 89@utils.register_interface(OCSPResponse) 90class _OCSPResponse(object): 91 def __init__(self, backend, ocsp_response): 92 self._backend = backend 93 self._ocsp_response = ocsp_response 94 status = self._backend._lib.OCSP_response_status(self._ocsp_response) 95 self._backend.openssl_assert(status in _RESPONSE_STATUS_TO_ENUM) 96 self._status = _RESPONSE_STATUS_TO_ENUM[status] 97 if self._status is OCSPResponseStatus.SUCCESSFUL: 98 basic = self._backend._lib.OCSP_response_get1_basic( 99 self._ocsp_response 100 ) 101 self._backend.openssl_assert(basic != self._backend._ffi.NULL) 102 self._basic = self._backend._ffi.gc( 103 basic, self._backend._lib.OCSP_BASICRESP_free 104 ) 105 self._backend.openssl_assert( 106 self._backend._lib.OCSP_resp_count(self._basic) == 1 107 ) 108 self._single = self._backend._lib.OCSP_resp_get0(self._basic, 0) 109 self._backend.openssl_assert( 110 self._single != self._backend._ffi.NULL 111 ) 112 self._cert_id = self._backend._lib.OCSP_SINGLERESP_get0_id( 113 self._single 114 ) 115 self._backend.openssl_assert( 116 self._cert_id != self._backend._ffi.NULL 117 ) 118 119 response_status = utils.read_only_property("_status") 120 121 @property 122 @_requires_successful_response 123 def signature_algorithm_oid(self): 124 alg = self._backend._lib.OCSP_resp_get0_tbs_sigalg(self._basic) 125 self._backend.openssl_assert(alg != self._backend._ffi.NULL) 126 oid = _obj2txt(self._backend, alg.algorithm) 127 return x509.ObjectIdentifier(oid) 128 129 @property 130 @_requires_successful_response 131 def signature_hash_algorithm(self): 132 oid = self.signature_algorithm_oid 133 try: 134 return x509._SIG_OIDS_TO_HASH[oid] 135 except KeyError: 136 raise UnsupportedAlgorithm( 137 "Signature algorithm OID:{0} not recognized".format(oid) 138 ) 139 140 @property 141 @_requires_successful_response 142 def signature(self): 143 sig = self._backend._lib.OCSP_resp_get0_signature(self._basic) 144 self._backend.openssl_assert(sig != self._backend._ffi.NULL) 145 return _asn1_string_to_bytes(self._backend, sig) 146 147 @property 148 @_requires_successful_response 149 def tbs_response_bytes(self): 150 respdata = self._backend._lib.OCSP_resp_get0_respdata(self._basic) 151 self._backend.openssl_assert(respdata != self._backend._ffi.NULL) 152 pp = self._backend._ffi.new("unsigned char **") 153 res = self._backend._lib.i2d_OCSP_RESPDATA(respdata, pp) 154 self._backend.openssl_assert(pp[0] != self._backend._ffi.NULL) 155 pp = self._backend._ffi.gc( 156 pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0]) 157 ) 158 self._backend.openssl_assert(res > 0) 159 return self._backend._ffi.buffer(pp[0], res)[:] 160 161 @property 162 @_requires_successful_response 163 def certificates(self): 164 sk_x509 = self._backend._lib.OCSP_resp_get0_certs(self._basic) 165 num = self._backend._lib.sk_X509_num(sk_x509) 166 certs = [] 167 for i in range(num): 168 x509 = self._backend._lib.sk_X509_value(sk_x509, i) 169 self._backend.openssl_assert(x509 != self._backend._ffi.NULL) 170 cert = _Certificate(self._backend, x509) 171 # We need to keep the OCSP response that the certificate came from 172 # alive until the Certificate object itself goes out of scope, so 173 # we give it a private reference. 174 cert._ocsp_resp = self 175 certs.append(cert) 176 177 return certs 178 179 @property 180 @_requires_successful_response 181 def responder_key_hash(self): 182 _, asn1_string = self._responder_key_name() 183 if asn1_string == self._backend._ffi.NULL: 184 return None 185 else: 186 return _asn1_string_to_bytes(self._backend, asn1_string) 187 188 @property 189 @_requires_successful_response 190 def responder_name(self): 191 x509_name, _ = self._responder_key_name() 192 if x509_name == self._backend._ffi.NULL: 193 return None 194 else: 195 return _decode_x509_name(self._backend, x509_name) 196 197 def _responder_key_name(self): 198 asn1_string = self._backend._ffi.new("ASN1_OCTET_STRING **") 199 x509_name = self._backend._ffi.new("X509_NAME **") 200 res = self._backend._lib.OCSP_resp_get0_id( 201 self._basic, asn1_string, x509_name 202 ) 203 self._backend.openssl_assert(res == 1) 204 return x509_name[0], asn1_string[0] 205 206 @property 207 @_requires_successful_response 208 def produced_at(self): 209 produced_at = self._backend._lib.OCSP_resp_get0_produced_at( 210 self._basic 211 ) 212 return _parse_asn1_generalized_time(self._backend, produced_at) 213 214 @property 215 @_requires_successful_response 216 def certificate_status(self): 217 status = self._backend._lib.OCSP_single_get0_status( 218 self._single, 219 self._backend._ffi.NULL, 220 self._backend._ffi.NULL, 221 self._backend._ffi.NULL, 222 self._backend._ffi.NULL, 223 ) 224 self._backend.openssl_assert(status in _CERT_STATUS_TO_ENUM) 225 return _CERT_STATUS_TO_ENUM[status] 226 227 @property 228 @_requires_successful_response 229 def revocation_time(self): 230 if self.certificate_status is not OCSPCertStatus.REVOKED: 231 return None 232 233 asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **") 234 self._backend._lib.OCSP_single_get0_status( 235 self._single, 236 self._backend._ffi.NULL, 237 asn1_time, 238 self._backend._ffi.NULL, 239 self._backend._ffi.NULL, 240 ) 241 self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL) 242 return _parse_asn1_generalized_time(self._backend, asn1_time[0]) 243 244 @property 245 @_requires_successful_response 246 def revocation_reason(self): 247 if self.certificate_status is not OCSPCertStatus.REVOKED: 248 return None 249 250 reason_ptr = self._backend._ffi.new("int *") 251 self._backend._lib.OCSP_single_get0_status( 252 self._single, 253 reason_ptr, 254 self._backend._ffi.NULL, 255 self._backend._ffi.NULL, 256 self._backend._ffi.NULL, 257 ) 258 # If no reason is encoded OpenSSL returns -1 259 if reason_ptr[0] == -1: 260 return None 261 else: 262 self._backend.openssl_assert( 263 reason_ptr[0] in _CRL_ENTRY_REASON_CODE_TO_ENUM 264 ) 265 return _CRL_ENTRY_REASON_CODE_TO_ENUM[reason_ptr[0]] 266 267 @property 268 @_requires_successful_response 269 def this_update(self): 270 asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **") 271 self._backend._lib.OCSP_single_get0_status( 272 self._single, 273 self._backend._ffi.NULL, 274 self._backend._ffi.NULL, 275 asn1_time, 276 self._backend._ffi.NULL, 277 ) 278 self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL) 279 return _parse_asn1_generalized_time(self._backend, asn1_time[0]) 280 281 @property 282 @_requires_successful_response 283 def next_update(self): 284 asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **") 285 self._backend._lib.OCSP_single_get0_status( 286 self._single, 287 self._backend._ffi.NULL, 288 self._backend._ffi.NULL, 289 self._backend._ffi.NULL, 290 asn1_time, 291 ) 292 if asn1_time[0] != self._backend._ffi.NULL: 293 return _parse_asn1_generalized_time(self._backend, asn1_time[0]) 294 else: 295 return None 296 297 @property 298 @_requires_successful_response 299 def issuer_key_hash(self): 300 return _issuer_key_hash(self._backend, self._cert_id) 301 302 @property 303 @_requires_successful_response 304 def issuer_name_hash(self): 305 return _issuer_name_hash(self._backend, self._cert_id) 306 307 @property 308 @_requires_successful_response 309 def hash_algorithm(self): 310 return _hash_algorithm(self._backend, self._cert_id) 311 312 @property 313 @_requires_successful_response 314 def serial_number(self): 315 return _serial_number(self._backend, self._cert_id) 316 317 @utils.cached_property 318 @_requires_successful_response 319 def extensions(self): 320 return _OCSP_BASICRESP_EXT_PARSER.parse(self._backend, self._basic) 321 322 def public_bytes(self, encoding): 323 if encoding is not serialization.Encoding.DER: 324 raise ValueError( 325 "The only allowed encoding value is Encoding.DER" 326 ) 327 328 bio = self._backend._create_mem_bio_gc() 329 res = self._backend._lib.i2d_OCSP_RESPONSE_bio( 330 bio, self._ocsp_response 331 ) 332 self._backend.openssl_assert(res > 0) 333 return self._backend._read_mem_bio(bio) 334 335 336@utils.register_interface(OCSPRequest) 337class _OCSPRequest(object): 338 def __init__(self, backend, ocsp_request): 339 if backend._lib.OCSP_request_onereq_count(ocsp_request) > 1: 340 raise NotImplementedError( 341 'OCSP request contains more than one request' 342 ) 343 self._backend = backend 344 self._ocsp_request = ocsp_request 345 self._request = self._backend._lib.OCSP_request_onereq_get0( 346 self._ocsp_request, 0 347 ) 348 self._backend.openssl_assert(self._request != self._backend._ffi.NULL) 349 self._cert_id = self._backend._lib.OCSP_onereq_get0_id(self._request) 350 self._backend.openssl_assert(self._cert_id != self._backend._ffi.NULL) 351 352 @property 353 def issuer_key_hash(self): 354 return _issuer_key_hash(self._backend, self._cert_id) 355 356 @property 357 def issuer_name_hash(self): 358 return _issuer_name_hash(self._backend, self._cert_id) 359 360 @property 361 def serial_number(self): 362 return _serial_number(self._backend, self._cert_id) 363 364 @property 365 def hash_algorithm(self): 366 return _hash_algorithm(self._backend, self._cert_id) 367 368 @utils.cached_property 369 def extensions(self): 370 return _OCSP_REQ_EXT_PARSER.parse(self._backend, self._ocsp_request) 371 372 def public_bytes(self, encoding): 373 if encoding is not serialization.Encoding.DER: 374 raise ValueError( 375 "The only allowed encoding value is Encoding.DER" 376 ) 377 378 bio = self._backend._create_mem_bio_gc() 379 res = self._backend._lib.i2d_OCSP_REQUEST_bio(bio, self._ocsp_request) 380 self._backend.openssl_assert(res > 0) 381 return self._backend._read_mem_bio(bio) 382