• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import absolute_import, division, print_function
6
7import abc
8import datetime
9from enum import Enum
10
11import six
12
13from cryptography import x509
14from cryptography.hazmat.primitives import hashes
15from cryptography.x509.base import (
16    _EARLIEST_UTC_TIME,
17    _convert_to_naive_utc_time,
18    _reject_duplicate_extension,
19)
20
21
22_OIDS_TO_HASH = {
23    "1.3.14.3.2.26": hashes.SHA1(),
24    "2.16.840.1.101.3.4.2.4": hashes.SHA224(),
25    "2.16.840.1.101.3.4.2.1": hashes.SHA256(),
26    "2.16.840.1.101.3.4.2.2": hashes.SHA384(),
27    "2.16.840.1.101.3.4.2.3": hashes.SHA512(),
28}
29
30
31class OCSPResponderEncoding(Enum):
32    HASH = "By Hash"
33    NAME = "By Name"
34
35
36class OCSPResponseStatus(Enum):
37    SUCCESSFUL = 0
38    MALFORMED_REQUEST = 1
39    INTERNAL_ERROR = 2
40    TRY_LATER = 3
41    SIG_REQUIRED = 5
42    UNAUTHORIZED = 6
43
44
45_RESPONSE_STATUS_TO_ENUM = {x.value: x for x in OCSPResponseStatus}
46_ALLOWED_HASHES = (
47    hashes.SHA1,
48    hashes.SHA224,
49    hashes.SHA256,
50    hashes.SHA384,
51    hashes.SHA512,
52)
53
54
55def _verify_algorithm(algorithm):
56    if not isinstance(algorithm, _ALLOWED_HASHES):
57        raise ValueError(
58            "Algorithm must be SHA1, SHA224, SHA256, SHA384, or SHA512"
59        )
60
61
62class OCSPCertStatus(Enum):
63    GOOD = 0
64    REVOKED = 1
65    UNKNOWN = 2
66
67
68_CERT_STATUS_TO_ENUM = {x.value: x for x in OCSPCertStatus}
69
70
71def load_der_ocsp_request(data):
72    from cryptography.hazmat.backends.openssl.backend import backend
73
74    return backend.load_der_ocsp_request(data)
75
76
77def load_der_ocsp_response(data):
78    from cryptography.hazmat.backends.openssl.backend import backend
79
80    return backend.load_der_ocsp_response(data)
81
82
83class OCSPRequestBuilder(object):
84    def __init__(self, request=None, extensions=[]):
85        self._request = request
86        self._extensions = extensions
87
88    def add_certificate(self, cert, issuer, algorithm):
89        if self._request is not None:
90            raise ValueError("Only one certificate can be added to a request")
91
92        _verify_algorithm(algorithm)
93        if not isinstance(cert, x509.Certificate) or not isinstance(
94            issuer, x509.Certificate
95        ):
96            raise TypeError("cert and issuer must be a Certificate")
97
98        return OCSPRequestBuilder((cert, issuer, algorithm), self._extensions)
99
100    def add_extension(self, extension, critical):
101        if not isinstance(extension, x509.ExtensionType):
102            raise TypeError("extension must be an ExtensionType")
103
104        extension = x509.Extension(extension.oid, critical, extension)
105        _reject_duplicate_extension(extension, self._extensions)
106
107        return OCSPRequestBuilder(
108            self._request, self._extensions + [extension]
109        )
110
111    def build(self):
112        from cryptography.hazmat.backends.openssl.backend import backend
113
114        if self._request is None:
115            raise ValueError("You must add a certificate before building")
116
117        return backend.create_ocsp_request(self)
118
119
120class _SingleResponse(object):
121    def __init__(
122        self,
123        cert,
124        issuer,
125        algorithm,
126        cert_status,
127        this_update,
128        next_update,
129        revocation_time,
130        revocation_reason,
131    ):
132        if not isinstance(cert, x509.Certificate) or not isinstance(
133            issuer, x509.Certificate
134        ):
135            raise TypeError("cert and issuer must be a Certificate")
136
137        _verify_algorithm(algorithm)
138        if not isinstance(this_update, datetime.datetime):
139            raise TypeError("this_update must be a datetime object")
140        if next_update is not None and not isinstance(
141            next_update, datetime.datetime
142        ):
143            raise TypeError("next_update must be a datetime object or None")
144
145        self._cert = cert
146        self._issuer = issuer
147        self._algorithm = algorithm
148        self._this_update = this_update
149        self._next_update = next_update
150
151        if not isinstance(cert_status, OCSPCertStatus):
152            raise TypeError(
153                "cert_status must be an item from the OCSPCertStatus enum"
154            )
155        if cert_status is not OCSPCertStatus.REVOKED:
156            if revocation_time is not None:
157                raise ValueError(
158                    "revocation_time can only be provided if the certificate "
159                    "is revoked"
160                )
161            if revocation_reason is not None:
162                raise ValueError(
163                    "revocation_reason can only be provided if the certificate"
164                    " is revoked"
165                )
166        else:
167            if not isinstance(revocation_time, datetime.datetime):
168                raise TypeError("revocation_time must be a datetime object")
169
170            revocation_time = _convert_to_naive_utc_time(revocation_time)
171            if revocation_time < _EARLIEST_UTC_TIME:
172                raise ValueError(
173                    "The revocation_time must be on or after"
174                    " 1950 January 1."
175                )
176
177            if revocation_reason is not None and not isinstance(
178                revocation_reason, x509.ReasonFlags
179            ):
180                raise TypeError(
181                    "revocation_reason must be an item from the ReasonFlags "
182                    "enum or None"
183                )
184
185        self._cert_status = cert_status
186        self._revocation_time = revocation_time
187        self._revocation_reason = revocation_reason
188
189
190class OCSPResponseBuilder(object):
191    def __init__(
192        self, response=None, responder_id=None, certs=None, extensions=[]
193    ):
194        self._response = response
195        self._responder_id = responder_id
196        self._certs = certs
197        self._extensions = extensions
198
199    def add_response(
200        self,
201        cert,
202        issuer,
203        algorithm,
204        cert_status,
205        this_update,
206        next_update,
207        revocation_time,
208        revocation_reason,
209    ):
210        if self._response is not None:
211            raise ValueError("Only one response per OCSPResponse.")
212
213        singleresp = _SingleResponse(
214            cert,
215            issuer,
216            algorithm,
217            cert_status,
218            this_update,
219            next_update,
220            revocation_time,
221            revocation_reason,
222        )
223        return OCSPResponseBuilder(
224            singleresp,
225            self._responder_id,
226            self._certs,
227            self._extensions,
228        )
229
230    def responder_id(self, encoding, responder_cert):
231        if self._responder_id is not None:
232            raise ValueError("responder_id can only be set once")
233        if not isinstance(responder_cert, x509.Certificate):
234            raise TypeError("responder_cert must be a Certificate")
235        if not isinstance(encoding, OCSPResponderEncoding):
236            raise TypeError(
237                "encoding must be an element from OCSPResponderEncoding"
238            )
239
240        return OCSPResponseBuilder(
241            self._response,
242            (responder_cert, encoding),
243            self._certs,
244            self._extensions,
245        )
246
247    def certificates(self, certs):
248        if self._certs is not None:
249            raise ValueError("certificates may only be set once")
250        certs = list(certs)
251        if len(certs) == 0:
252            raise ValueError("certs must not be an empty list")
253        if not all(isinstance(x, x509.Certificate) for x in certs):
254            raise TypeError("certs must be a list of Certificates")
255        return OCSPResponseBuilder(
256            self._response,
257            self._responder_id,
258            certs,
259            self._extensions,
260        )
261
262    def add_extension(self, extension, critical):
263        if not isinstance(extension, x509.ExtensionType):
264            raise TypeError("extension must be an ExtensionType")
265
266        extension = x509.Extension(extension.oid, critical, extension)
267        _reject_duplicate_extension(extension, self._extensions)
268
269        return OCSPResponseBuilder(
270            self._response,
271            self._responder_id,
272            self._certs,
273            self._extensions + [extension],
274        )
275
276    def sign(self, private_key, algorithm):
277        from cryptography.hazmat.backends.openssl.backend import backend
278
279        if self._response is None:
280            raise ValueError("You must add a response before signing")
281        if self._responder_id is None:
282            raise ValueError("You must add a responder_id before signing")
283
284        return backend.create_ocsp_response(
285            OCSPResponseStatus.SUCCESSFUL, self, private_key, algorithm
286        )
287
288    @classmethod
289    def build_unsuccessful(cls, response_status):
290        from cryptography.hazmat.backends.openssl.backend import backend
291
292        if not isinstance(response_status, OCSPResponseStatus):
293            raise TypeError(
294                "response_status must be an item from OCSPResponseStatus"
295            )
296        if response_status is OCSPResponseStatus.SUCCESSFUL:
297            raise ValueError("response_status cannot be SUCCESSFUL")
298
299        return backend.create_ocsp_response(response_status, None, None, None)
300
301
302@six.add_metaclass(abc.ABCMeta)
303class OCSPRequest(object):
304    @abc.abstractproperty
305    def issuer_key_hash(self):
306        """
307        The hash of the issuer public key
308        """
309
310    @abc.abstractproperty
311    def issuer_name_hash(self):
312        """
313        The hash of the issuer name
314        """
315
316    @abc.abstractproperty
317    def hash_algorithm(self):
318        """
319        The hash algorithm used in the issuer name and key hashes
320        """
321
322    @abc.abstractproperty
323    def serial_number(self):
324        """
325        The serial number of the cert whose status is being checked
326        """
327
328    @abc.abstractmethod
329    def public_bytes(self, encoding):
330        """
331        Serializes the request to DER
332        """
333
334    @abc.abstractproperty
335    def extensions(self):
336        """
337        The list of request extensions. Not single request extensions.
338        """
339
340
341@six.add_metaclass(abc.ABCMeta)
342class OCSPResponse(object):
343    @abc.abstractproperty
344    def response_status(self):
345        """
346        The status of the response. This is a value from the OCSPResponseStatus
347        enumeration
348        """
349
350    @abc.abstractproperty
351    def signature_algorithm_oid(self):
352        """
353        The ObjectIdentifier of the signature algorithm
354        """
355
356    @abc.abstractproperty
357    def signature_hash_algorithm(self):
358        """
359        Returns a HashAlgorithm corresponding to the type of the digest signed
360        """
361
362    @abc.abstractproperty
363    def signature(self):
364        """
365        The signature bytes
366        """
367
368    @abc.abstractproperty
369    def tbs_response_bytes(self):
370        """
371        The tbsResponseData bytes
372        """
373
374    @abc.abstractproperty
375    def certificates(self):
376        """
377        A list of certificates used to help build a chain to verify the OCSP
378        response. This situation occurs when the OCSP responder uses a delegate
379        certificate.
380        """
381
382    @abc.abstractproperty
383    def responder_key_hash(self):
384        """
385        The responder's key hash or None
386        """
387
388    @abc.abstractproperty
389    def responder_name(self):
390        """
391        The responder's Name or None
392        """
393
394    @abc.abstractproperty
395    def produced_at(self):
396        """
397        The time the response was produced
398        """
399
400    @abc.abstractproperty
401    def certificate_status(self):
402        """
403        The status of the certificate (an element from the OCSPCertStatus enum)
404        """
405
406    @abc.abstractproperty
407    def revocation_time(self):
408        """
409        The date of when the certificate was revoked or None if not
410        revoked.
411        """
412
413    @abc.abstractproperty
414    def revocation_reason(self):
415        """
416        The reason the certificate was revoked or None if not specified or
417        not revoked.
418        """
419
420    @abc.abstractproperty
421    def this_update(self):
422        """
423        The most recent time at which the status being indicated is known by
424        the responder to have been correct
425        """
426
427    @abc.abstractproperty
428    def next_update(self):
429        """
430        The time when newer information will be available
431        """
432
433    @abc.abstractproperty
434    def issuer_key_hash(self):
435        """
436        The hash of the issuer public key
437        """
438
439    @abc.abstractproperty
440    def issuer_name_hash(self):
441        """
442        The hash of the issuer name
443        """
444
445    @abc.abstractproperty
446    def hash_algorithm(self):
447        """
448        The hash algorithm used in the issuer name and key hashes
449        """
450
451    @abc.abstractproperty
452    def serial_number(self):
453        """
454        The serial number of the cert whose status is being checked
455        """
456
457    @abc.abstractproperty
458    def extensions(self):
459        """
460        The list of response extensions. Not single response extensions.
461        """
462
463    @abc.abstractproperty
464    def single_extensions(self):
465        """
466        The list of single response extensions. Not response extensions.
467        """
468