• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import absolute_import, division, print_function
6
7import functools
8
9from cryptography import utils, x509
10from cryptography.exceptions import UnsupportedAlgorithm
11from cryptography.hazmat.backends.openssl.decode_asn1 import (
12    _CRL_ENTRY_REASON_CODE_TO_ENUM, _OCSP_BASICRESP_EXT_PARSER,
13    _OCSP_REQ_EXT_PARSER, _asn1_integer_to_int,
14    _asn1_string_to_bytes, _decode_x509_name, _obj2txt,
15    _parse_asn1_generalized_time,
16)
17from cryptography.hazmat.backends.openssl.x509 import _Certificate
18from cryptography.hazmat.primitives import serialization
19from cryptography.x509.ocsp import (
20    OCSPCertStatus, OCSPRequest, OCSPResponse, OCSPResponseStatus,
21    _CERT_STATUS_TO_ENUM, _OIDS_TO_HASH, _RESPONSE_STATUS_TO_ENUM,
22)
23
24
25def _requires_successful_response(func):
26    @functools.wraps(func)
27    def wrapper(self, *args):
28        if self.response_status != OCSPResponseStatus.SUCCESSFUL:
29            raise ValueError(
30                "OCSP response status is not successful so the property "
31                "has no value"
32            )
33        else:
34            return func(self, *args)
35
36    return wrapper
37
38
39def _issuer_key_hash(backend, cert_id):
40    key_hash = backend._ffi.new("ASN1_OCTET_STRING **")
41    res = backend._lib.OCSP_id_get0_info(
42        backend._ffi.NULL, backend._ffi.NULL,
43        key_hash, backend._ffi.NULL, cert_id
44    )
45    backend.openssl_assert(res == 1)
46    backend.openssl_assert(key_hash[0] != backend._ffi.NULL)
47    return _asn1_string_to_bytes(backend, key_hash[0])
48
49
50def _issuer_name_hash(backend, cert_id):
51    name_hash = backend._ffi.new("ASN1_OCTET_STRING **")
52    res = backend._lib.OCSP_id_get0_info(
53        name_hash, backend._ffi.NULL,
54        backend._ffi.NULL, backend._ffi.NULL, cert_id
55    )
56    backend.openssl_assert(res == 1)
57    backend.openssl_assert(name_hash[0] != backend._ffi.NULL)
58    return _asn1_string_to_bytes(backend, name_hash[0])
59
60
61def _serial_number(backend, cert_id):
62    num = backend._ffi.new("ASN1_INTEGER **")
63    res = backend._lib.OCSP_id_get0_info(
64        backend._ffi.NULL, backend._ffi.NULL,
65        backend._ffi.NULL, num, cert_id
66    )
67    backend.openssl_assert(res == 1)
68    backend.openssl_assert(num[0] != backend._ffi.NULL)
69    return _asn1_integer_to_int(backend, num[0])
70
71
72def _hash_algorithm(backend, cert_id):
73    asn1obj = backend._ffi.new("ASN1_OBJECT **")
74    res = backend._lib.OCSP_id_get0_info(
75        backend._ffi.NULL, asn1obj,
76        backend._ffi.NULL, backend._ffi.NULL, cert_id
77    )
78    backend.openssl_assert(res == 1)
79    backend.openssl_assert(asn1obj[0] != backend._ffi.NULL)
80    oid = _obj2txt(backend, asn1obj[0])
81    try:
82        return _OIDS_TO_HASH[oid]
83    except KeyError:
84        raise UnsupportedAlgorithm(
85            "Signature algorithm OID: {0} not recognized".format(oid)
86        )
87
88
89@utils.register_interface(OCSPResponse)
90class _OCSPResponse(object):
91    def __init__(self, backend, ocsp_response):
92        self._backend = backend
93        self._ocsp_response = ocsp_response
94        status = self._backend._lib.OCSP_response_status(self._ocsp_response)
95        self._backend.openssl_assert(status in _RESPONSE_STATUS_TO_ENUM)
96        self._status = _RESPONSE_STATUS_TO_ENUM[status]
97        if self._status is OCSPResponseStatus.SUCCESSFUL:
98            basic = self._backend._lib.OCSP_response_get1_basic(
99                self._ocsp_response
100            )
101            self._backend.openssl_assert(basic != self._backend._ffi.NULL)
102            self._basic = self._backend._ffi.gc(
103                basic, self._backend._lib.OCSP_BASICRESP_free
104            )
105            self._backend.openssl_assert(
106                self._backend._lib.OCSP_resp_count(self._basic) == 1
107            )
108            self._single = self._backend._lib.OCSP_resp_get0(self._basic, 0)
109            self._backend.openssl_assert(
110                self._single != self._backend._ffi.NULL
111            )
112            self._cert_id = self._backend._lib.OCSP_SINGLERESP_get0_id(
113                self._single
114            )
115            self._backend.openssl_assert(
116                self._cert_id != self._backend._ffi.NULL
117            )
118
119    response_status = utils.read_only_property("_status")
120
121    @property
122    @_requires_successful_response
123    def signature_algorithm_oid(self):
124        alg = self._backend._lib.OCSP_resp_get0_tbs_sigalg(self._basic)
125        self._backend.openssl_assert(alg != self._backend._ffi.NULL)
126        oid = _obj2txt(self._backend, alg.algorithm)
127        return x509.ObjectIdentifier(oid)
128
129    @property
130    @_requires_successful_response
131    def signature_hash_algorithm(self):
132        oid = self.signature_algorithm_oid
133        try:
134            return x509._SIG_OIDS_TO_HASH[oid]
135        except KeyError:
136            raise UnsupportedAlgorithm(
137                "Signature algorithm OID:{0} not recognized".format(oid)
138            )
139
140    @property
141    @_requires_successful_response
142    def signature(self):
143        sig = self._backend._lib.OCSP_resp_get0_signature(self._basic)
144        self._backend.openssl_assert(sig != self._backend._ffi.NULL)
145        return _asn1_string_to_bytes(self._backend, sig)
146
147    @property
148    @_requires_successful_response
149    def tbs_response_bytes(self):
150        respdata = self._backend._lib.OCSP_resp_get0_respdata(self._basic)
151        self._backend.openssl_assert(respdata != self._backend._ffi.NULL)
152        pp = self._backend._ffi.new("unsigned char **")
153        res = self._backend._lib.i2d_OCSP_RESPDATA(respdata, pp)
154        self._backend.openssl_assert(pp[0] != self._backend._ffi.NULL)
155        pp = self._backend._ffi.gc(
156            pp, lambda pointer: self._backend._lib.OPENSSL_free(pointer[0])
157        )
158        self._backend.openssl_assert(res > 0)
159        return self._backend._ffi.buffer(pp[0], res)[:]
160
161    @property
162    @_requires_successful_response
163    def certificates(self):
164        sk_x509 = self._backend._lib.OCSP_resp_get0_certs(self._basic)
165        num = self._backend._lib.sk_X509_num(sk_x509)
166        certs = []
167        for i in range(num):
168            x509 = self._backend._lib.sk_X509_value(sk_x509, i)
169            self._backend.openssl_assert(x509 != self._backend._ffi.NULL)
170            cert = _Certificate(self._backend, x509)
171            # We need to keep the OCSP response that the certificate came from
172            # alive until the Certificate object itself goes out of scope, so
173            # we give it a private reference.
174            cert._ocsp_resp = self
175            certs.append(cert)
176
177        return certs
178
179    @property
180    @_requires_successful_response
181    def responder_key_hash(self):
182        _, asn1_string = self._responder_key_name()
183        if asn1_string == self._backend._ffi.NULL:
184            return None
185        else:
186            return _asn1_string_to_bytes(self._backend, asn1_string)
187
188    @property
189    @_requires_successful_response
190    def responder_name(self):
191        x509_name, _ = self._responder_key_name()
192        if x509_name == self._backend._ffi.NULL:
193            return None
194        else:
195            return _decode_x509_name(self._backend, x509_name)
196
197    def _responder_key_name(self):
198        asn1_string = self._backend._ffi.new("ASN1_OCTET_STRING **")
199        x509_name = self._backend._ffi.new("X509_NAME **")
200        res = self._backend._lib.OCSP_resp_get0_id(
201            self._basic, asn1_string, x509_name
202        )
203        self._backend.openssl_assert(res == 1)
204        return x509_name[0], asn1_string[0]
205
206    @property
207    @_requires_successful_response
208    def produced_at(self):
209        produced_at = self._backend._lib.OCSP_resp_get0_produced_at(
210            self._basic
211        )
212        return _parse_asn1_generalized_time(self._backend, produced_at)
213
214    @property
215    @_requires_successful_response
216    def certificate_status(self):
217        status = self._backend._lib.OCSP_single_get0_status(
218            self._single,
219            self._backend._ffi.NULL,
220            self._backend._ffi.NULL,
221            self._backend._ffi.NULL,
222            self._backend._ffi.NULL,
223        )
224        self._backend.openssl_assert(status in _CERT_STATUS_TO_ENUM)
225        return _CERT_STATUS_TO_ENUM[status]
226
227    @property
228    @_requires_successful_response
229    def revocation_time(self):
230        if self.certificate_status is not OCSPCertStatus.REVOKED:
231            return None
232
233        asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
234        self._backend._lib.OCSP_single_get0_status(
235            self._single,
236            self._backend._ffi.NULL,
237            asn1_time,
238            self._backend._ffi.NULL,
239            self._backend._ffi.NULL,
240        )
241        self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL)
242        return _parse_asn1_generalized_time(self._backend, asn1_time[0])
243
244    @property
245    @_requires_successful_response
246    def revocation_reason(self):
247        if self.certificate_status is not OCSPCertStatus.REVOKED:
248            return None
249
250        reason_ptr = self._backend._ffi.new("int *")
251        self._backend._lib.OCSP_single_get0_status(
252            self._single,
253            reason_ptr,
254            self._backend._ffi.NULL,
255            self._backend._ffi.NULL,
256            self._backend._ffi.NULL,
257        )
258        # If no reason is encoded OpenSSL returns -1
259        if reason_ptr[0] == -1:
260            return None
261        else:
262            self._backend.openssl_assert(
263                reason_ptr[0] in _CRL_ENTRY_REASON_CODE_TO_ENUM
264            )
265            return _CRL_ENTRY_REASON_CODE_TO_ENUM[reason_ptr[0]]
266
267    @property
268    @_requires_successful_response
269    def this_update(self):
270        asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
271        self._backend._lib.OCSP_single_get0_status(
272            self._single,
273            self._backend._ffi.NULL,
274            self._backend._ffi.NULL,
275            asn1_time,
276            self._backend._ffi.NULL,
277        )
278        self._backend.openssl_assert(asn1_time[0] != self._backend._ffi.NULL)
279        return _parse_asn1_generalized_time(self._backend, asn1_time[0])
280
281    @property
282    @_requires_successful_response
283    def next_update(self):
284        asn1_time = self._backend._ffi.new("ASN1_GENERALIZEDTIME **")
285        self._backend._lib.OCSP_single_get0_status(
286            self._single,
287            self._backend._ffi.NULL,
288            self._backend._ffi.NULL,
289            self._backend._ffi.NULL,
290            asn1_time,
291        )
292        if asn1_time[0] != self._backend._ffi.NULL:
293            return _parse_asn1_generalized_time(self._backend, asn1_time[0])
294        else:
295            return None
296
297    @property
298    @_requires_successful_response
299    def issuer_key_hash(self):
300        return _issuer_key_hash(self._backend, self._cert_id)
301
302    @property
303    @_requires_successful_response
304    def issuer_name_hash(self):
305        return _issuer_name_hash(self._backend, self._cert_id)
306
307    @property
308    @_requires_successful_response
309    def hash_algorithm(self):
310        return _hash_algorithm(self._backend, self._cert_id)
311
312    @property
313    @_requires_successful_response
314    def serial_number(self):
315        return _serial_number(self._backend, self._cert_id)
316
317    @utils.cached_property
318    @_requires_successful_response
319    def extensions(self):
320        return _OCSP_BASICRESP_EXT_PARSER.parse(self._backend, self._basic)
321
322    def public_bytes(self, encoding):
323        if encoding is not serialization.Encoding.DER:
324            raise ValueError(
325                "The only allowed encoding value is Encoding.DER"
326            )
327
328        bio = self._backend._create_mem_bio_gc()
329        res = self._backend._lib.i2d_OCSP_RESPONSE_bio(
330            bio, self._ocsp_response
331        )
332        self._backend.openssl_assert(res > 0)
333        return self._backend._read_mem_bio(bio)
334
335
336@utils.register_interface(OCSPRequest)
337class _OCSPRequest(object):
338    def __init__(self, backend, ocsp_request):
339        if backend._lib.OCSP_request_onereq_count(ocsp_request) > 1:
340            raise NotImplementedError(
341                'OCSP request contains more than one request'
342            )
343        self._backend = backend
344        self._ocsp_request = ocsp_request
345        self._request = self._backend._lib.OCSP_request_onereq_get0(
346            self._ocsp_request, 0
347        )
348        self._backend.openssl_assert(self._request != self._backend._ffi.NULL)
349        self._cert_id = self._backend._lib.OCSP_onereq_get0_id(self._request)
350        self._backend.openssl_assert(self._cert_id != self._backend._ffi.NULL)
351
352    @property
353    def issuer_key_hash(self):
354        return _issuer_key_hash(self._backend, self._cert_id)
355
356    @property
357    def issuer_name_hash(self):
358        return _issuer_name_hash(self._backend, self._cert_id)
359
360    @property
361    def serial_number(self):
362        return _serial_number(self._backend, self._cert_id)
363
364    @property
365    def hash_algorithm(self):
366        return _hash_algorithm(self._backend, self._cert_id)
367
368    @utils.cached_property
369    def extensions(self):
370        return _OCSP_REQ_EXT_PARSER.parse(self._backend, self._ocsp_request)
371
372    def public_bytes(self, encoding):
373        if encoding is not serialization.Encoding.DER:
374            raise ValueError(
375                "The only allowed encoding value is Encoding.DER"
376            )
377
378        bio = self._backend._create_mem_bio_gc()
379        res = self._backend._lib.i2d_OCSP_REQUEST_bio(bio, self._ocsp_request)
380        self._backend.openssl_assert(res > 0)
381        return self._backend._read_mem_bio(bio)
382