1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7import functools 8 9from cryptography import utils, x509 10from cryptography.exceptions import UnsupportedAlgorithm 11from cryptography.hazmat.backends.openssl.decode_asn1 import ( 12 _CRL_ENTRY_REASON_CODE_TO_ENUM, 13 _asn1_integer_to_int, 14 _asn1_string_to_bytes, 15 _decode_x509_name, 16 _obj2txt, 17 _parse_asn1_generalized_time, 18) 19from cryptography.hazmat.backends.openssl.x509 import _Certificate 20from cryptography.hazmat.primitives import serialization 21from cryptography.x509.ocsp import ( 22 OCSPCertStatus, 23 OCSPRequest, 24 OCSPResponse, 25 OCSPResponseStatus, 26 _CERT_STATUS_TO_ENUM, 27 _OIDS_TO_HASH, 28 _RESPONSE_STATUS_TO_ENUM, 29) 30 31 32def _requires_successful_response(func): 33 @functools.wraps(func) 34 def wrapper(self, *args): 35 if self.response_status != OCSPResponseStatus.SUCCESSFUL: 36 raise ValueError( 37 "OCSP response status is not successful so the property " 38 "has no value" 39 ) 40 else: 41 return func(self, *args) 42 43 return wrapper 44 45 46def _issuer_key_hash(backend, cert_id): 47 key_hash = backend._ffi.new("ASN1_OCTET_STRING **") 48 res = backend._lib.OCSP_id_get0_info( 49 backend._ffi.NULL, 50 backend._ffi.NULL, 51 key_hash, 52 backend._ffi.NULL, 53 cert_id, 54 ) 55 backend.openssl_assert(res == 1) 56 backend.openssl_assert(key_hash[0] != backend._ffi.NULL) 57 return _asn1_string_to_bytes(backend, key_hash[0]) 58 59 60def _issuer_name_hash(backend, cert_id): 61 name_hash = backend._ffi.new("ASN1_OCTET_STRING **") 62 res = backend._lib.OCSP_id_get0_info( 63 name_hash, 64 backend._ffi.NULL, 65 backend._ffi.NULL, 66 backend._ffi.NULL, 67 cert_id, 68 ) 69 backend.openssl_assert(res == 1) 70 backend.openssl_assert(name_hash[0] != backend._ffi.NULL) 71 return _asn1_string_to_bytes(backend, name_hash[0]) 72 73 74def _serial_number(backend, cert_id): 75 num = backend._ffi.new("ASN1_INTEGER **") 76 res = backend._lib.OCSP_id_get0_info( 77 backend._ffi.NULL, backend._ffi.NULL, backend._ffi.NULL, num, cert_id 78 ) 79 backend.openssl_assert(res == 1) 80 backend.openssl_assert(num[0] != backend._ffi.NULL) 81 return _asn1_integer_to_int(backend, num[0]) 82 83 84def _hash_algorithm(backend, cert_id): 85 asn1obj = backend._ffi.new("ASN1_OBJECT **") 86 res = backend._lib.OCSP_id_get0_info( 87 backend._ffi.NULL, 88 asn1obj, 89 backend._ffi.NULL, 90 backend._ffi.NULL, 91 cert_id, 92 ) 93 backend.openssl_assert(res == 1) 94 backend.openssl_assert(asn1obj[0] != backend._ffi.NULL) 95 oid = _obj2txt(backend, asn1obj[0]) 96 try: 97 return _OIDS_TO_HASH[oid] 98 except KeyError: 99 raise UnsupportedAlgorithm( 100 "Signature algorithm OID: {} not recognized".format(oid) 101 ) 102 103 104@utils.register_interface(OCSPResponse) 105class _OCSPResponse(object): 106 def __init__(self, backend, ocsp_response): 107 self._backend = backend 108 self._ocsp_response = ocsp_response 109 status = self._backend._lib.OCSP_response_status(self._ocsp_response) 110 self._backend.openssl_assert(status in _RESPONSE_STATUS_TO_ENUM) 111 self._status = _RESPONSE_STATUS_TO_ENUM[status] 112 if self._status is OCSPResponseStatus.SUCCESSFUL: 113 basic = self._backend._lib.OCSP_response_get1_basic( 114 self._ocsp_response 115 ) 116 self._backend.openssl_assert(basic != self._backend._ffi.NULL) 117 self._basic = self._backend._ffi.gc( 118 basic, self._backend._lib.OCSP_BASICRESP_free 119 ) 120 num_resp = self._backend._lib.OCSP_resp_count(self._basic) 121 if num_resp != 1: 122 raise ValueError( 123 "OCSP response contains more than one SINGLERESP structure" 124 ", which this library does not support. " 125 "{} found".format(num_resp) 126 ) 127 self._single = self._backend._lib.OCSP_resp_get0(self._basic, 0) 128 self._backend.openssl_assert( 129 self._single != self._backend._ffi.NULL 130 ) 131 self._cert_id = self._backend._lib.OCSP_SINGLERESP_get0_id( 132 self._single 133 ) 134 self._backend.openssl_assert( 135 self._cert_id != self._backend._ffi.NULL 136 ) 137 138 response_status = utils.read_only_property("_status") 139 140 @property 141 @_requires_successful_response 142 def signature_algorithm_oid(self): 143 alg = self._backend._lib.OCSP_resp_get0_tbs_sigalg(self._basic) 144 self._backend.openssl_assert(alg != self._backend._ffi.NULL) 145 oid = _obj2txt(self._backend, alg.algorithm) 146 return x509.ObjectIdentifier(oid) 147 148 @property 149 @_requires_successful_response 150 def signature_hash_algorithm(self): 151 oid = self.signature_algorithm_oid 152 try: 153 return x509._SIG_OIDS_TO_HASH[oid] 154 except KeyError: 155 raise UnsupportedAlgorithm( 156 "Signature algorithm OID:{} not recognized".format(oid) 157 ) 158 159 @property 160 @_requires_successful_response 161 def signature(self): 162 sig = self._backend._lib.OCSP_resp_get0_signature(self._basic) 163 self._backend.openssl_assert(sig != self._backend._ffi.NULL) 164 return _asn1_string_to_bytes(self._backend, sig) 165 166 @property 167 @_requires_successful_response 168 def tbs_response_bytes(self): 169 respdata = self._backend._lib.OCSP_resp_get0_respdata(self._basic) 170 self._backend.openssl_assert(respdata != self._backend._ffi.NULL) 171 pp = self._backend._ffi.new("unsigned char **") 172 res = self._backend._lib.i2d_OCSP_RESPDATA(respdata, pp) 173 self._backend.openssl_assert(pp[0] != self._backend._ffi.NULL) 174 pp = self._backend._ffi.gc( 175 pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0]) 176 ) 177 self._backend.openssl_assert(res > 0) 178 return self._backend._ffi.buffer(pp[0], res)[:] 179 180 @property 181 @_requires_successful_response 182 def certificates(self): 183 sk_x509 = self._backend._lib.OCSP_resp_get0_certs(self._basic) 184 num = self._backend._lib.sk_X509_num(sk_x509) 185 certs = [] 186 for i in range(num): 187 x509 = self._backend._lib.sk_X509_value(sk_x509, i) 188 self._backend.openssl_assert(x509 != self._backend._ffi.NULL) 189 cert = _Certificate(self._backend, x509) 190 # We need to keep the OCSP response that the certificate came from 191 # alive until the Certificate object itself goes out of scope, so 192 # we give it a private reference. 193 cert._ocsp_resp = self 194 certs.append(cert) 195 196 return certs 197 198 @property 199 @_requires_successful_response 200 def responder_key_hash(self): 201 _, asn1_string = self._responder_key_name() 202 if asn1_string == self._backend._ffi.NULL: 203 return None 204 else: 205 return _asn1_string_to_bytes(self._backend, asn1_string) 206 207 @property 208 @_requires_successful_response 209 def responder_name(self): 210 x509_name, _ = self._responder_key_name() 211 if x509_name == self._backend._ffi.NULL: 212 return None 213 else: 214 return _decode_x509_name(self._backend, x509_name) 215 216 def _responder_key_name(self): 217 asn1_string = self._backend._ffi.new("ASN1_OCTET_STRING **") 218 x509_name = self._backend._ffi.new("X509_NAME **") 219 res = self._backend._lib.OCSP_resp_get0_id( 220 self._basic, asn1_string, x509_name 221 ) 222 self._backend.openssl_assert(res == 1) 223 return x509_name[0], asn1_string[0] 224 225 @property 226 @_requires_successful_response 227 def produced_at(self): 228 produced_at = self._backend._lib.OCSP_resp_get0_produced_at( 229 self._basic 230 ) 231 return _parse_asn1_generalized_time(self._backend, produced_at) 232 233 @property 234 @_requires_successful_response 235 def certificate_status(self): 236 status = self._backend._lib.OCSP_single_get0_status( 237 self._single, 238 self._backend._ffi.NULL, 239 self._backend._ffi.NULL, 240 self._backend._ffi.NULL, 241 self._backend._ffi.NULL, 242 ) 243 self._backend.openssl_assert(status in _CERT_STATUS_TO_ENUM) 244 return _CERT_STATUS_TO_ENUM[status] 245 246 @property 247 @_requires_successful_response 248 def revocation_time(self): 249 if self.certificate_status is not OCSPCertStatus.REVOKED: 250 return None 251 252 asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **") 253 self._backend._lib.OCSP_single_get0_status( 254 self._single, 255 self._backend._ffi.NULL, 256 asn1_time, 257 self._backend._ffi.NULL, 258 self._backend._ffi.NULL, 259 ) 260 self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL) 261 return _parse_asn1_generalized_time(self._backend, asn1_time[0]) 262 263 @property 264 @_requires_successful_response 265 def revocation_reason(self): 266 if self.certificate_status is not OCSPCertStatus.REVOKED: 267 return None 268 269 reason_ptr = self._backend._ffi.new("int *") 270 self._backend._lib.OCSP_single_get0_status( 271 self._single, 272 reason_ptr, 273 self._backend._ffi.NULL, 274 self._backend._ffi.NULL, 275 self._backend._ffi.NULL, 276 ) 277 # If no reason is encoded OpenSSL returns -1 278 if reason_ptr[0] == -1: 279 return None 280 else: 281 self._backend.openssl_assert( 282 reason_ptr[0] in _CRL_ENTRY_REASON_CODE_TO_ENUM 283 ) 284 return _CRL_ENTRY_REASON_CODE_TO_ENUM[reason_ptr[0]] 285 286 @property 287 @_requires_successful_response 288 def this_update(self): 289 asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **") 290 self._backend._lib.OCSP_single_get0_status( 291 self._single, 292 self._backend._ffi.NULL, 293 self._backend._ffi.NULL, 294 asn1_time, 295 self._backend._ffi.NULL, 296 ) 297 self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL) 298 return _parse_asn1_generalized_time(self._backend, asn1_time[0]) 299 300 @property 301 @_requires_successful_response 302 def next_update(self): 303 asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **") 304 self._backend._lib.OCSP_single_get0_status( 305 self._single, 306 self._backend._ffi.NULL, 307 self._backend._ffi.NULL, 308 self._backend._ffi.NULL, 309 asn1_time, 310 ) 311 if asn1_time[0] != self._backend._ffi.NULL: 312 return _parse_asn1_generalized_time(self._backend, asn1_time[0]) 313 else: 314 return None 315 316 @property 317 @_requires_successful_response 318 def issuer_key_hash(self): 319 return _issuer_key_hash(self._backend, self._cert_id) 320 321 @property 322 @_requires_successful_response 323 def issuer_name_hash(self): 324 return _issuer_name_hash(self._backend, self._cert_id) 325 326 @property 327 @_requires_successful_response 328 def hash_algorithm(self): 329 return _hash_algorithm(self._backend, self._cert_id) 330 331 @property 332 @_requires_successful_response 333 def serial_number(self): 334 return _serial_number(self._backend, self._cert_id) 335 336 @utils.cached_property 337 @_requires_successful_response 338 def extensions(self): 339 return self._backend._ocsp_basicresp_ext_parser.parse(self._basic) 340 341 @utils.cached_property 342 @_requires_successful_response 343 def single_extensions(self): 344 return self._backend._ocsp_singleresp_ext_parser.parse(self._single) 345 346 def public_bytes(self, encoding): 347 if encoding is not serialization.Encoding.DER: 348 raise ValueError("The only allowed encoding value is Encoding.DER") 349 350 bio = self._backend._create_mem_bio_gc() 351 res = self._backend._lib.i2d_OCSP_RESPONSE_bio( 352 bio, self._ocsp_response 353 ) 354 self._backend.openssl_assert(res > 0) 355 return self._backend._read_mem_bio(bio) 356 357 358@utils.register_interface(OCSPRequest) 359class _OCSPRequest(object): 360 def __init__(self, backend, ocsp_request): 361 if backend._lib.OCSP_request_onereq_count(ocsp_request) > 1: 362 raise NotImplementedError( 363 "OCSP request contains more than one request" 364 ) 365 self._backend = backend 366 self._ocsp_request = ocsp_request 367 self._request = self._backend._lib.OCSP_request_onereq_get0( 368 self._ocsp_request, 0 369 ) 370 self._backend.openssl_assert(self._request != self._backend._ffi.NULL) 371 self._cert_id = self._backend._lib.OCSP_onereq_get0_id(self._request) 372 self._backend.openssl_assert(self._cert_id != self._backend._ffi.NULL) 373 374 @property 375 def issuer_key_hash(self): 376 return _issuer_key_hash(self._backend, self._cert_id) 377 378 @property 379 def issuer_name_hash(self): 380 return _issuer_name_hash(self._backend, self._cert_id) 381 382 @property 383 def serial_number(self): 384 return _serial_number(self._backend, self._cert_id) 385 386 @property 387 def hash_algorithm(self): 388 return _hash_algorithm(self._backend, self._cert_id) 389 390 @utils.cached_property 391 def extensions(self): 392 return self._backend._ocsp_req_ext_parser.parse(self._ocsp_request) 393 394 def public_bytes(self, encoding): 395 if encoding is not serialization.Encoding.DER: 396 raise ValueError("The only allowed encoding value is Encoding.DER") 397 398 bio = self._backend._create_mem_bio_gc() 399 res = self._backend._lib.i2d_OCSP_REQUEST_bio(bio, self._ocsp_request) 400 self._backend.openssl_assert(res > 0) 401 return self._backend._read_mem_bio(bio) 402