• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import absolute_import, division, print_function
6
7import abc
8import datetime
9from enum import Enum
10
11import six
12
13from cryptography import x509
14from cryptography.hazmat.primitives import hashes
15from cryptography.x509.base import (
16    _EARLIEST_UTC_TIME, _convert_to_naive_utc_time, _reject_duplicate_extension
17)
18
19
20_OIDS_TO_HASH = {
21    "1.3.14.3.2.26": hashes.SHA1(),
22    "2.16.840.1.101.3.4.2.4": hashes.SHA224(),
23    "2.16.840.1.101.3.4.2.1": hashes.SHA256(),
24    "2.16.840.1.101.3.4.2.2": hashes.SHA384(),
25    "2.16.840.1.101.3.4.2.3": hashes.SHA512(),
26}
27
28
29class OCSPResponderEncoding(Enum):
30    HASH = "By Hash"
31    NAME = "By Name"
32
33
34class OCSPResponseStatus(Enum):
35    SUCCESSFUL = 0
36    MALFORMED_REQUEST = 1
37    INTERNAL_ERROR = 2
38    TRY_LATER = 3
39    SIG_REQUIRED = 5
40    UNAUTHORIZED = 6
41
42
43_RESPONSE_STATUS_TO_ENUM = dict((x.value, x) for x in OCSPResponseStatus)
44_ALLOWED_HASHES = (
45    hashes.SHA1, hashes.SHA224, hashes.SHA256,
46    hashes.SHA384, hashes.SHA512
47)
48
49
50def _verify_algorithm(algorithm):
51    if not isinstance(algorithm, _ALLOWED_HASHES):
52        raise ValueError(
53            "Algorithm must be SHA1, SHA224, SHA256, SHA384, or SHA512"
54        )
55
56
57class OCSPCertStatus(Enum):
58    GOOD = 0
59    REVOKED = 1
60    UNKNOWN = 2
61
62
63_CERT_STATUS_TO_ENUM = dict((x.value, x) for x in OCSPCertStatus)
64
65
66def load_der_ocsp_request(data):
67    from cryptography.hazmat.backends.openssl.backend import backend
68    return backend.load_der_ocsp_request(data)
69
70
71def load_der_ocsp_response(data):
72    from cryptography.hazmat.backends.openssl.backend import backend
73    return backend.load_der_ocsp_response(data)
74
75
76class OCSPRequestBuilder(object):
77    def __init__(self, request=None, extensions=[]):
78        self._request = request
79        self._extensions = extensions
80
81    def add_certificate(self, cert, issuer, algorithm):
82        if self._request is not None:
83            raise ValueError("Only one certificate can be added to a request")
84
85        _verify_algorithm(algorithm)
86        if (
87            not isinstance(cert, x509.Certificate) or
88            not isinstance(issuer, x509.Certificate)
89        ):
90            raise TypeError("cert and issuer must be a Certificate")
91
92        return OCSPRequestBuilder((cert, issuer, algorithm), self._extensions)
93
94    def add_extension(self, extension, critical):
95        if not isinstance(extension, x509.ExtensionType):
96            raise TypeError("extension must be an ExtensionType")
97
98        extension = x509.Extension(extension.oid, critical, extension)
99        _reject_duplicate_extension(extension, self._extensions)
100
101        return OCSPRequestBuilder(
102            self._request, self._extensions + [extension]
103        )
104
105    def build(self):
106        from cryptography.hazmat.backends.openssl.backend import backend
107        if self._request is None:
108            raise ValueError("You must add a certificate before building")
109
110        return backend.create_ocsp_request(self)
111
112
113class _SingleResponse(object):
114    def __init__(self, cert, issuer, algorithm, cert_status, this_update,
115                 next_update, revocation_time, revocation_reason):
116        if (
117            not isinstance(cert, x509.Certificate) or
118            not isinstance(issuer, x509.Certificate)
119        ):
120            raise TypeError("cert and issuer must be a Certificate")
121
122        _verify_algorithm(algorithm)
123        if not isinstance(this_update, datetime.datetime):
124            raise TypeError("this_update must be a datetime object")
125        if (
126            next_update is not None and
127            not isinstance(next_update, datetime.datetime)
128        ):
129            raise TypeError("next_update must be a datetime object or None")
130
131        self._cert = cert
132        self._issuer = issuer
133        self._algorithm = algorithm
134        self._this_update = this_update
135        self._next_update = next_update
136
137        if not isinstance(cert_status, OCSPCertStatus):
138            raise TypeError(
139                "cert_status must be an item from the OCSPCertStatus enum"
140            )
141        if cert_status is not OCSPCertStatus.REVOKED:
142            if revocation_time is not None:
143                raise ValueError(
144                    "revocation_time can only be provided if the certificate "
145                    "is revoked"
146                )
147            if revocation_reason is not None:
148                raise ValueError(
149                    "revocation_reason can only be provided if the certificate"
150                    " is revoked"
151                )
152        else:
153            if not isinstance(revocation_time, datetime.datetime):
154                raise TypeError("revocation_time must be a datetime object")
155
156            revocation_time = _convert_to_naive_utc_time(revocation_time)
157            if revocation_time < _EARLIEST_UTC_TIME:
158                raise ValueError('The revocation_time must be on or after'
159                                 ' 1950 January 1.')
160
161            if (
162                revocation_reason is not None and
163                not isinstance(revocation_reason, x509.ReasonFlags)
164            ):
165                raise TypeError(
166                    "revocation_reason must be an item from the ReasonFlags "
167                    "enum or None"
168                )
169
170        self._cert_status = cert_status
171        self._revocation_time = revocation_time
172        self._revocation_reason = revocation_reason
173
174
175class OCSPResponseBuilder(object):
176    def __init__(self, response=None, responder_id=None, certs=None,
177                 extensions=[]):
178        self._response = response
179        self._responder_id = responder_id
180        self._certs = certs
181        self._extensions = extensions
182
183    def add_response(self, cert, issuer, algorithm, cert_status, this_update,
184                     next_update, revocation_time, revocation_reason):
185        if self._response is not None:
186            raise ValueError("Only one response per OCSPResponse.")
187
188        singleresp = _SingleResponse(
189            cert, issuer, algorithm, cert_status, this_update, next_update,
190            revocation_time, revocation_reason
191        )
192        return OCSPResponseBuilder(
193            singleresp, self._responder_id,
194            self._certs, self._extensions,
195        )
196
197    def responder_id(self, encoding, responder_cert):
198        if self._responder_id is not None:
199            raise ValueError("responder_id can only be set once")
200        if not isinstance(responder_cert, x509.Certificate):
201            raise TypeError("responder_cert must be a Certificate")
202        if not isinstance(encoding, OCSPResponderEncoding):
203            raise TypeError(
204                "encoding must be an element from OCSPResponderEncoding"
205            )
206
207        return OCSPResponseBuilder(
208            self._response, (responder_cert, encoding),
209            self._certs, self._extensions,
210        )
211
212    def certificates(self, certs):
213        if self._certs is not None:
214            raise ValueError("certificates may only be set once")
215        certs = list(certs)
216        if len(certs) == 0:
217            raise ValueError("certs must not be an empty list")
218        if not all(isinstance(x, x509.Certificate) for x in certs):
219            raise TypeError("certs must be a list of Certificates")
220        return OCSPResponseBuilder(
221            self._response, self._responder_id,
222            certs, self._extensions,
223        )
224
225    def add_extension(self, extension, critical):
226        if not isinstance(extension, x509.ExtensionType):
227            raise TypeError("extension must be an ExtensionType")
228
229        extension = x509.Extension(extension.oid, critical, extension)
230        _reject_duplicate_extension(extension, self._extensions)
231
232        return OCSPResponseBuilder(
233            self._response, self._responder_id,
234            self._certs, self._extensions + [extension],
235        )
236
237    def sign(self, private_key, algorithm):
238        from cryptography.hazmat.backends.openssl.backend import backend
239        if self._response is None:
240            raise ValueError("You must add a response before signing")
241        if self._responder_id is None:
242            raise ValueError("You must add a responder_id before signing")
243
244        if not isinstance(algorithm, hashes.HashAlgorithm):
245            raise TypeError("Algorithm must be a registered hash algorithm.")
246
247        return backend.create_ocsp_response(
248            OCSPResponseStatus.SUCCESSFUL, self, private_key, algorithm
249        )
250
251    @classmethod
252    def build_unsuccessful(cls, response_status):
253        from cryptography.hazmat.backends.openssl.backend import backend
254        if not isinstance(response_status, OCSPResponseStatus):
255            raise TypeError(
256                "response_status must be an item from OCSPResponseStatus"
257            )
258        if response_status is OCSPResponseStatus.SUCCESSFUL:
259            raise ValueError("response_status cannot be SUCCESSFUL")
260
261        return backend.create_ocsp_response(response_status, None, None, None)
262
263
264@six.add_metaclass(abc.ABCMeta)
265class OCSPRequest(object):
266    @abc.abstractproperty
267    def issuer_key_hash(self):
268        """
269        The hash of the issuer public key
270        """
271
272    @abc.abstractproperty
273    def issuer_name_hash(self):
274        """
275        The hash of the issuer name
276        """
277
278    @abc.abstractproperty
279    def hash_algorithm(self):
280        """
281        The hash algorithm used in the issuer name and key hashes
282        """
283
284    @abc.abstractproperty
285    def serial_number(self):
286        """
287        The serial number of the cert whose status is being checked
288        """
289    @abc.abstractmethod
290    def public_bytes(self, encoding):
291        """
292        Serializes the request to DER
293        """
294
295    @abc.abstractproperty
296    def extensions(self):
297        """
298        The list of request extensions. Not single request extensions.
299        """
300
301
302@six.add_metaclass(abc.ABCMeta)
303class OCSPResponse(object):
304    @abc.abstractproperty
305    def response_status(self):
306        """
307        The status of the response. This is a value from the OCSPResponseStatus
308        enumeration
309        """
310
311    @abc.abstractproperty
312    def signature_algorithm_oid(self):
313        """
314        The ObjectIdentifier of the signature algorithm
315        """
316
317    @abc.abstractproperty
318    def signature_hash_algorithm(self):
319        """
320        Returns a HashAlgorithm corresponding to the type of the digest signed
321        """
322
323    @abc.abstractproperty
324    def signature(self):
325        """
326        The signature bytes
327        """
328
329    @abc.abstractproperty
330    def tbs_response_bytes(self):
331        """
332        The tbsResponseData bytes
333        """
334
335    @abc.abstractproperty
336    def certificates(self):
337        """
338        A list of certificates used to help build a chain to verify the OCSP
339        response. This situation occurs when the OCSP responder uses a delegate
340        certificate.
341        """
342
343    @abc.abstractproperty
344    def responder_key_hash(self):
345        """
346        The responder's key hash or None
347        """
348
349    @abc.abstractproperty
350    def responder_name(self):
351        """
352        The responder's Name or None
353        """
354
355    @abc.abstractproperty
356    def produced_at(self):
357        """
358        The time the response was produced
359        """
360
361    @abc.abstractproperty
362    def certificate_status(self):
363        """
364        The status of the certificate (an element from the OCSPCertStatus enum)
365        """
366
367    @abc.abstractproperty
368    def revocation_time(self):
369        """
370        The date of when the certificate was revoked or None if not
371        revoked.
372        """
373
374    @abc.abstractproperty
375    def revocation_reason(self):
376        """
377        The reason the certificate was revoked or None if not specified or
378        not revoked.
379        """
380
381    @abc.abstractproperty
382    def this_update(self):
383        """
384        The most recent time at which the status being indicated is known by
385        the responder to have been correct
386        """
387
388    @abc.abstractproperty
389    def next_update(self):
390        """
391        The time when newer information will be available
392        """
393
394    @abc.abstractproperty
395    def issuer_key_hash(self):
396        """
397        The hash of the issuer public key
398        """
399
400    @abc.abstractproperty
401    def issuer_name_hash(self):
402        """
403        The hash of the issuer name
404        """
405
406    @abc.abstractproperty
407    def hash_algorithm(self):
408        """
409        The hash algorithm used in the issuer name and key hashes
410        """
411
412    @abc.abstractproperty
413    def serial_number(self):
414        """
415        The serial number of the cert whose status is being checked
416        """
417
418    @abc.abstractproperty
419    def extensions(self):
420        """
421        The list of response extensions. Not single response extensions.
422        """
423