• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import absolute_import, division, print_function
6
7import abc
8
9import six
10
11from cryptography import utils
12from cryptography.exceptions import (
13    AlreadyFinalized,
14    AlreadyUpdated,
15    NotYetFinalized,
16    UnsupportedAlgorithm,
17    _Reasons,
18)
19from cryptography.hazmat.backends import _get_backend
20from cryptography.hazmat.backends.interfaces import CipherBackend
21from cryptography.hazmat.primitives.ciphers import modes
22
23
24@six.add_metaclass(abc.ABCMeta)
25class CipherAlgorithm(object):
26    @abc.abstractproperty
27    def name(self):
28        """
29        A string naming this mode (e.g. "AES", "Camellia").
30        """
31
32    @abc.abstractproperty
33    def key_size(self):
34        """
35        The size of the key being used as an integer in bits (e.g. 128, 256).
36        """
37
38
39@six.add_metaclass(abc.ABCMeta)
40class BlockCipherAlgorithm(object):
41    @abc.abstractproperty
42    def block_size(self):
43        """
44        The size of a block as an integer in bits (e.g. 64, 128).
45        """
46
47
48@six.add_metaclass(abc.ABCMeta)
49class CipherContext(object):
50    @abc.abstractmethod
51    def update(self, data):
52        """
53        Processes the provided bytes through the cipher and returns the results
54        as bytes.
55        """
56
57    @abc.abstractmethod
58    def update_into(self, data, buf):
59        """
60        Processes the provided bytes and writes the resulting data into the
61        provided buffer. Returns the number of bytes written.
62        """
63
64    @abc.abstractmethod
65    def finalize(self):
66        """
67        Returns the results of processing the final block as bytes.
68        """
69
70
71@six.add_metaclass(abc.ABCMeta)
72class AEADCipherContext(object):
73    @abc.abstractmethod
74    def authenticate_additional_data(self, data):
75        """
76        Authenticates the provided bytes.
77        """
78
79
80@six.add_metaclass(abc.ABCMeta)
81class AEADDecryptionContext(object):
82    @abc.abstractmethod
83    def finalize_with_tag(self, tag):
84        """
85        Returns the results of processing the final block as bytes and allows
86        delayed passing of the authentication tag.
87        """
88
89
90@six.add_metaclass(abc.ABCMeta)
91class AEADEncryptionContext(object):
92    @abc.abstractproperty
93    def tag(self):
94        """
95        Returns tag bytes. This is only available after encryption is
96        finalized.
97        """
98
99
100class Cipher(object):
101    def __init__(self, algorithm, mode, backend=None):
102        backend = _get_backend(backend)
103        if not isinstance(backend, CipherBackend):
104            raise UnsupportedAlgorithm(
105                "Backend object does not implement CipherBackend.",
106                _Reasons.BACKEND_MISSING_INTERFACE,
107            )
108
109        if not isinstance(algorithm, CipherAlgorithm):
110            raise TypeError("Expected interface of CipherAlgorithm.")
111
112        if mode is not None:
113            mode.validate_for_algorithm(algorithm)
114
115        self.algorithm = algorithm
116        self.mode = mode
117        self._backend = backend
118
119    def encryptor(self):
120        if isinstance(self.mode, modes.ModeWithAuthenticationTag):
121            if self.mode.tag is not None:
122                raise ValueError(
123                    "Authentication tag must be None when encrypting."
124                )
125        ctx = self._backend.create_symmetric_encryption_ctx(
126            self.algorithm, self.mode
127        )
128        return self._wrap_ctx(ctx, encrypt=True)
129
130    def decryptor(self):
131        ctx = self._backend.create_symmetric_decryption_ctx(
132            self.algorithm, self.mode
133        )
134        return self._wrap_ctx(ctx, encrypt=False)
135
136    def _wrap_ctx(self, ctx, encrypt):
137        if isinstance(self.mode, modes.ModeWithAuthenticationTag):
138            if encrypt:
139                return _AEADEncryptionContext(ctx)
140            else:
141                return _AEADCipherContext(ctx)
142        else:
143            return _CipherContext(ctx)
144
145
146@utils.register_interface(CipherContext)
147class _CipherContext(object):
148    def __init__(self, ctx):
149        self._ctx = ctx
150
151    def update(self, data):
152        if self._ctx is None:
153            raise AlreadyFinalized("Context was already finalized.")
154        return self._ctx.update(data)
155
156    def update_into(self, data, buf):
157        if self._ctx is None:
158            raise AlreadyFinalized("Context was already finalized.")
159        return self._ctx.update_into(data, buf)
160
161    def finalize(self):
162        if self._ctx is None:
163            raise AlreadyFinalized("Context was already finalized.")
164        data = self._ctx.finalize()
165        self._ctx = None
166        return data
167
168
169@utils.register_interface(AEADCipherContext)
170@utils.register_interface(CipherContext)
171@utils.register_interface(AEADDecryptionContext)
172class _AEADCipherContext(object):
173    def __init__(self, ctx):
174        self._ctx = ctx
175        self._bytes_processed = 0
176        self._aad_bytes_processed = 0
177        self._tag = None
178        self._updated = False
179
180    def _check_limit(self, data_size):
181        if self._ctx is None:
182            raise AlreadyFinalized("Context was already finalized.")
183        self._updated = True
184        self._bytes_processed += data_size
185        if self._bytes_processed > self._ctx._mode._MAX_ENCRYPTED_BYTES:
186            raise ValueError(
187                "{} has a maximum encrypted byte limit of {}".format(
188                    self._ctx._mode.name, self._ctx._mode._MAX_ENCRYPTED_BYTES
189                )
190            )
191
192    def update(self, data):
193        self._check_limit(len(data))
194        return self._ctx.update(data)
195
196    def update_into(self, data, buf):
197        self._check_limit(len(data))
198        return self._ctx.update_into(data, buf)
199
200    def finalize(self):
201        if self._ctx is None:
202            raise AlreadyFinalized("Context was already finalized.")
203        data = self._ctx.finalize()
204        self._tag = self._ctx.tag
205        self._ctx = None
206        return data
207
208    def finalize_with_tag(self, tag):
209        if self._ctx is None:
210            raise AlreadyFinalized("Context was already finalized.")
211        data = self._ctx.finalize_with_tag(tag)
212        self._tag = self._ctx.tag
213        self._ctx = None
214        return data
215
216    def authenticate_additional_data(self, data):
217        if self._ctx is None:
218            raise AlreadyFinalized("Context was already finalized.")
219        if self._updated:
220            raise AlreadyUpdated("Update has been called on this context.")
221
222        self._aad_bytes_processed += len(data)
223        if self._aad_bytes_processed > self._ctx._mode._MAX_AAD_BYTES:
224            raise ValueError(
225                "{} has a maximum AAD byte limit of {}".format(
226                    self._ctx._mode.name, self._ctx._mode._MAX_AAD_BYTES
227                )
228            )
229
230        self._ctx.authenticate_additional_data(data)
231
232
233@utils.register_interface(AEADEncryptionContext)
234class _AEADEncryptionContext(_AEADCipherContext):
235    @property
236    def tag(self):
237        if self._ctx is not None:
238            raise NotYetFinalized(
239                "You must finalize encryption before " "getting the tag."
240            )
241        return self._tag
242