• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import absolute_import, division, print_function
6
7import functools
8
9from cryptography import utils, x509
10from cryptography.exceptions import UnsupportedAlgorithm
11from cryptography.hazmat.backends.openssl.decode_asn1 import (
12    _CRL_ENTRY_REASON_CODE_TO_ENUM,
13    _asn1_integer_to_int,
14    _asn1_string_to_bytes,
15    _decode_x509_name,
16    _obj2txt,
17    _parse_asn1_generalized_time,
18)
19from cryptography.hazmat.backends.openssl.x509 import _Certificate
20from cryptography.hazmat.primitives import serialization
21from cryptography.x509.ocsp import (
22    OCSPCertStatus,
23    OCSPRequest,
24    OCSPResponse,
25    OCSPResponseStatus,
26    _CERT_STATUS_TO_ENUM,
27    _OIDS_TO_HASH,
28    _RESPONSE_STATUS_TO_ENUM,
29)
30
31
32def _requires_successful_response(func):
33    @functools.wraps(func)
34    def wrapper(self, *args):
35        if self.response_status != OCSPResponseStatus.SUCCESSFUL:
36            raise ValueError(
37                "OCSP response status is not successful so the property "
38                "has no value"
39            )
40        else:
41            return func(self, *args)
42
43    return wrapper
44
45
46def _issuer_key_hash(backend, cert_id):
47    key_hash = backend._ffi.new("ASN1_OCTET_STRING **")
48    res = backend._lib.OCSP_id_get0_info(
49        backend._ffi.NULL,
50        backend._ffi.NULL,
51        key_hash,
52        backend._ffi.NULL,
53        cert_id,
54    )
55    backend.openssl_assert(res == 1)
56    backend.openssl_assert(key_hash[0] != backend._ffi.NULL)
57    return _asn1_string_to_bytes(backend, key_hash[0])
58
59
60def _issuer_name_hash(backend, cert_id):
61    name_hash = backend._ffi.new("ASN1_OCTET_STRING **")
62    res = backend._lib.OCSP_id_get0_info(
63        name_hash,
64        backend._ffi.NULL,
65        backend._ffi.NULL,
66        backend._ffi.NULL,
67        cert_id,
68    )
69    backend.openssl_assert(res == 1)
70    backend.openssl_assert(name_hash[0] != backend._ffi.NULL)
71    return _asn1_string_to_bytes(backend, name_hash[0])
72
73
74def _serial_number(backend, cert_id):
75    num = backend._ffi.new("ASN1_INTEGER **")
76    res = backend._lib.OCSP_id_get0_info(
77        backend._ffi.NULL, backend._ffi.NULL, backend._ffi.NULL, num, cert_id
78    )
79    backend.openssl_assert(res == 1)
80    backend.openssl_assert(num[0] != backend._ffi.NULL)
81    return _asn1_integer_to_int(backend, num[0])
82
83
84def _hash_algorithm(backend, cert_id):
85    asn1obj = backend._ffi.new("ASN1_OBJECT **")
86    res = backend._lib.OCSP_id_get0_info(
87        backend._ffi.NULL,
88        asn1obj,
89        backend._ffi.NULL,
90        backend._ffi.NULL,
91        cert_id,
92    )
93    backend.openssl_assert(res == 1)
94    backend.openssl_assert(asn1obj[0] != backend._ffi.NULL)
95    oid = _obj2txt(backend, asn1obj[0])
96    try:
97        return _OIDS_TO_HASH[oid]
98    except KeyError:
99        raise UnsupportedAlgorithm(
100            "Signature algorithm OID: {} not recognized".format(oid)
101        )
102
103
104@utils.register_interface(OCSPResponse)
105class _OCSPResponse(object):
106    def __init__(self, backend, ocsp_response):
107        self._backend = backend
108        self._ocsp_response = ocsp_response
109        status = self._backend._lib.OCSP_response_status(self._ocsp_response)
110        self._backend.openssl_assert(status in _RESPONSE_STATUS_TO_ENUM)
111        self._status = _RESPONSE_STATUS_TO_ENUM[status]
112        if self._status is OCSPResponseStatus.SUCCESSFUL:
113            basic = self._backend._lib.OCSP_response_get1_basic(
114                self._ocsp_response
115            )
116            self._backend.openssl_assert(basic != self._backend._ffi.NULL)
117            self._basic = self._backend._ffi.gc(
118                basic, self._backend._lib.OCSP_BASICRESP_free
119            )
120            num_resp = self._backend._lib.OCSP_resp_count(self._basic)
121            if num_resp != 1:
122                raise ValueError(
123                    "OCSP response contains more than one SINGLERESP structure"
124                    ", which this library does not support. "
125                    "{} found".format(num_resp)
126                )
127            self._single = self._backend._lib.OCSP_resp_get0(self._basic, 0)
128            self._backend.openssl_assert(
129                self._single != self._backend._ffi.NULL
130            )
131            self._cert_id = self._backend._lib.OCSP_SINGLERESP_get0_id(
132                self._single
133            )
134            self._backend.openssl_assert(
135                self._cert_id != self._backend._ffi.NULL
136            )
137
138    response_status = utils.read_only_property("_status")
139
140    @property
141    @_requires_successful_response
142    def signature_algorithm_oid(self):
143        alg = self._backend._lib.OCSP_resp_get0_tbs_sigalg(self._basic)
144        self._backend.openssl_assert(alg != self._backend._ffi.NULL)
145        oid = _obj2txt(self._backend, alg.algorithm)
146        return x509.ObjectIdentifier(oid)
147
148    @property
149    @_requires_successful_response
150    def signature_hash_algorithm(self):
151        oid = self.signature_algorithm_oid
152        try:
153            return x509._SIG_OIDS_TO_HASH[oid]
154        except KeyError:
155            raise UnsupportedAlgorithm(
156                "Signature algorithm OID:{} not recognized".format(oid)
157            )
158
159    @property
160    @_requires_successful_response
161    def signature(self):
162        sig = self._backend._lib.OCSP_resp_get0_signature(self._basic)
163        self._backend.openssl_assert(sig != self._backend._ffi.NULL)
164        return _asn1_string_to_bytes(self._backend, sig)
165
166    @property
167    @_requires_successful_response
168    def tbs_response_bytes(self):
169        respdata = self._backend._lib.OCSP_resp_get0_respdata(self._basic)
170        self._backend.openssl_assert(respdata != self._backend._ffi.NULL)
171        pp = self._backend._ffi.new("unsigned char **")
172        res = self._backend._lib.i2d_OCSP_RESPDATA(respdata, pp)
173        self._backend.openssl_assert(pp[0] != self._backend._ffi.NULL)
174        pp = self._backend._ffi.gc(
175            pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
176        )
177        self._backend.openssl_assert(res > 0)
178        return self._backend._ffi.buffer(pp[0], res)[:]
179
180    @property
181    @_requires_successful_response
182    def certificates(self):
183        sk_x509 = self._backend._lib.OCSP_resp_get0_certs(self._basic)
184        num = self._backend._lib.sk_X509_num(sk_x509)
185        certs = []
186        for i in range(num):
187            x509 = self._backend._lib.sk_X509_value(sk_x509, i)
188            self._backend.openssl_assert(x509 != self._backend._ffi.NULL)
189            cert = _Certificate(self._backend, x509)
190            # We need to keep the OCSP response that the certificate came from
191            # alive until the Certificate object itself goes out of scope, so
192            # we give it a private reference.
193            cert._ocsp_resp = self
194            certs.append(cert)
195
196        return certs
197
198    @property
199    @_requires_successful_response
200    def responder_key_hash(self):
201        _, asn1_string = self._responder_key_name()
202        if asn1_string == self._backend._ffi.NULL:
203            return None
204        else:
205            return _asn1_string_to_bytes(self._backend, asn1_string)
206
207    @property
208    @_requires_successful_response
209    def responder_name(self):
210        x509_name, _ = self._responder_key_name()
211        if x509_name == self._backend._ffi.NULL:
212            return None
213        else:
214            return _decode_x509_name(self._backend, x509_name)
215
216    def _responder_key_name(self):
217        asn1_string = self._backend._ffi.new("ASN1_OCTET_STRING **")
218        x509_name = self._backend._ffi.new("X509_NAME **")
219        res = self._backend._lib.OCSP_resp_get0_id(
220            self._basic, asn1_string, x509_name
221        )
222        self._backend.openssl_assert(res == 1)
223        return x509_name[0], asn1_string[0]
224
225    @property
226    @_requires_successful_response
227    def produced_at(self):
228        produced_at = self._backend._lib.OCSP_resp_get0_produced_at(
229            self._basic
230        )
231        return _parse_asn1_generalized_time(self._backend, produced_at)
232
233    @property
234    @_requires_successful_response
235    def certificate_status(self):
236        status = self._backend._lib.OCSP_single_get0_status(
237            self._single,
238            self._backend._ffi.NULL,
239            self._backend._ffi.NULL,
240            self._backend._ffi.NULL,
241            self._backend._ffi.NULL,
242        )
243        self._backend.openssl_assert(status in _CERT_STATUS_TO_ENUM)
244        return _CERT_STATUS_TO_ENUM[status]
245
246    @property
247    @_requires_successful_response
248    def revocation_time(self):
249        if self.certificate_status is not OCSPCertStatus.REVOKED:
250            return None
251
252        asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
253        self._backend._lib.OCSP_single_get0_status(
254            self._single,
255            self._backend._ffi.NULL,
256            asn1_time,
257            self._backend._ffi.NULL,
258            self._backend._ffi.NULL,
259        )
260        self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL)
261        return _parse_asn1_generalized_time(self._backend, asn1_time[0])
262
263    @property
264    @_requires_successful_response
265    def revocation_reason(self):
266        if self.certificate_status is not OCSPCertStatus.REVOKED:
267            return None
268
269        reason_ptr = self._backend._ffi.new("int *")
270        self._backend._lib.OCSP_single_get0_status(
271            self._single,
272            reason_ptr,
273            self._backend._ffi.NULL,
274            self._backend._ffi.NULL,
275            self._backend._ffi.NULL,
276        )
277        # If no reason is encoded OpenSSL returns -1
278        if reason_ptr[0] == -1:
279            return None
280        else:
281            self._backend.openssl_assert(
282                reason_ptr[0] in _CRL_ENTRY_REASON_CODE_TO_ENUM
283            )
284            return _CRL_ENTRY_REASON_CODE_TO_ENUM[reason_ptr[0]]
285
286    @property
287    @_requires_successful_response
288    def this_update(self):
289        asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
290        self._backend._lib.OCSP_single_get0_status(
291            self._single,
292            self._backend._ffi.NULL,
293            self._backend._ffi.NULL,
294            asn1_time,
295            self._backend._ffi.NULL,
296        )
297        self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL)
298        return _parse_asn1_generalized_time(self._backend, asn1_time[0])
299
300    @property
301    @_requires_successful_response
302    def next_update(self):
303        asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
304        self._backend._lib.OCSP_single_get0_status(
305            self._single,
306            self._backend._ffi.NULL,
307            self._backend._ffi.NULL,
308            self._backend._ffi.NULL,
309            asn1_time,
310        )
311        if asn1_time[0] != self._backend._ffi.NULL:
312            return _parse_asn1_generalized_time(self._backend, asn1_time[0])
313        else:
314            return None
315
316    @property
317    @_requires_successful_response
318    def issuer_key_hash(self):
319        return _issuer_key_hash(self._backend, self._cert_id)
320
321    @property
322    @_requires_successful_response
323    def issuer_name_hash(self):
324        return _issuer_name_hash(self._backend, self._cert_id)
325
326    @property
327    @_requires_successful_response
328    def hash_algorithm(self):
329        return _hash_algorithm(self._backend, self._cert_id)
330
331    @property
332    @_requires_successful_response
333    def serial_number(self):
334        return _serial_number(self._backend, self._cert_id)
335
336    @utils.cached_property
337    @_requires_successful_response
338    def extensions(self):
339        return self._backend._ocsp_basicresp_ext_parser.parse(self._basic)
340
341    @utils.cached_property
342    @_requires_successful_response
343    def single_extensions(self):
344        return self._backend._ocsp_singleresp_ext_parser.parse(self._single)
345
346    def public_bytes(self, encoding):
347        if encoding is not serialization.Encoding.DER:
348            raise ValueError("The only allowed encoding value is Encoding.DER")
349
350        bio = self._backend._create_mem_bio_gc()
351        res = self._backend._lib.i2d_OCSP_RESPONSE_bio(
352            bio, self._ocsp_response
353        )
354        self._backend.openssl_assert(res > 0)
355        return self._backend._read_mem_bio(bio)
356
357
358@utils.register_interface(OCSPRequest)
359class _OCSPRequest(object):
360    def __init__(self, backend, ocsp_request):
361        if backend._lib.OCSP_request_onereq_count(ocsp_request) > 1:
362            raise NotImplementedError(
363                "OCSP request contains more than one request"
364            )
365        self._backend = backend
366        self._ocsp_request = ocsp_request
367        self._request = self._backend._lib.OCSP_request_onereq_get0(
368            self._ocsp_request, 0
369        )
370        self._backend.openssl_assert(self._request != self._backend._ffi.NULL)
371        self._cert_id = self._backend._lib.OCSP_onereq_get0_id(self._request)
372        self._backend.openssl_assert(self._cert_id != self._backend._ffi.NULL)
373
374    @property
375    def issuer_key_hash(self):
376        return _issuer_key_hash(self._backend, self._cert_id)
377
378    @property
379    def issuer_name_hash(self):
380        return _issuer_name_hash(self._backend, self._cert_id)
381
382    @property
383    def serial_number(self):
384        return _serial_number(self._backend, self._cert_id)
385
386    @property
387    def hash_algorithm(self):
388        return _hash_algorithm(self._backend, self._cert_id)
389
390    @utils.cached_property
391    def extensions(self):
392        return self._backend._ocsp_req_ext_parser.parse(self._ocsp_request)
393
394    def public_bytes(self, encoding):
395        if encoding is not serialization.Encoding.DER:
396            raise ValueError("The only allowed encoding value is Encoding.DER")
397
398        bio = self._backend._create_mem_bio_gc()
399        res = self._backend._lib.i2d_OCSP_REQUEST_bio(bio, self._ocsp_request)
400        self._backend.openssl_assert(res > 0)
401        return self._backend._read_mem_bio(bio)
402