1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7import abc 8 9import six 10 11from cryptography import utils 12from cryptography.exceptions import ( 13 AlreadyFinalized, 14 AlreadyUpdated, 15 NotYetFinalized, 16 UnsupportedAlgorithm, 17 _Reasons, 18) 19from cryptography.hazmat.backends import _get_backend 20from cryptography.hazmat.backends.interfaces import CipherBackend 21from cryptography.hazmat.primitives.ciphers import modes 22 23 24@six.add_metaclass(abc.ABCMeta) 25class CipherAlgorithm(object): 26 @abc.abstractproperty 27 def name(self): 28 """ 29 A string naming this mode (e.g. "AES", "Camellia"). 30 """ 31 32 @abc.abstractproperty 33 def key_size(self): 34 """ 35 The size of the key being used as an integer in bits (e.g. 128, 256). 36 """ 37 38 39@six.add_metaclass(abc.ABCMeta) 40class BlockCipherAlgorithm(object): 41 @abc.abstractproperty 42 def block_size(self): 43 """ 44 The size of a block as an integer in bits (e.g. 64, 128). 45 """ 46 47 48@six.add_metaclass(abc.ABCMeta) 49class CipherContext(object): 50 @abc.abstractmethod 51 def update(self, data): 52 """ 53 Processes the provided bytes through the cipher and returns the results 54 as bytes. 55 """ 56 57 @abc.abstractmethod 58 def update_into(self, data, buf): 59 """ 60 Processes the provided bytes and writes the resulting data into the 61 provided buffer. Returns the number of bytes written. 62 """ 63 64 @abc.abstractmethod 65 def finalize(self): 66 """ 67 Returns the results of processing the final block as bytes. 68 """ 69 70 71@six.add_metaclass(abc.ABCMeta) 72class AEADCipherContext(object): 73 @abc.abstractmethod 74 def authenticate_additional_data(self, data): 75 """ 76 Authenticates the provided bytes. 77 """ 78 79 80@six.add_metaclass(abc.ABCMeta) 81class AEADDecryptionContext(object): 82 @abc.abstractmethod 83 def finalize_with_tag(self, tag): 84 """ 85 Returns the results of processing the final block as bytes and allows 86 delayed passing of the authentication tag. 87 """ 88 89 90@six.add_metaclass(abc.ABCMeta) 91class AEADEncryptionContext(object): 92 @abc.abstractproperty 93 def tag(self): 94 """ 95 Returns tag bytes. This is only available after encryption is 96 finalized. 97 """ 98 99 100class Cipher(object): 101 def __init__(self, algorithm, mode, backend=None): 102 backend = _get_backend(backend) 103 if not isinstance(backend, CipherBackend): 104 raise UnsupportedAlgorithm( 105 "Backend object does not implement CipherBackend.", 106 _Reasons.BACKEND_MISSING_INTERFACE, 107 ) 108 109 if not isinstance(algorithm, CipherAlgorithm): 110 raise TypeError("Expected interface of CipherAlgorithm.") 111 112 if mode is not None: 113 mode.validate_for_algorithm(algorithm) 114 115 self.algorithm = algorithm 116 self.mode = mode 117 self._backend = backend 118 119 def encryptor(self): 120 if isinstance(self.mode, modes.ModeWithAuthenticationTag): 121 if self.mode.tag is not None: 122 raise ValueError( 123 "Authentication tag must be None when encrypting." 124 ) 125 ctx = self._backend.create_symmetric_encryption_ctx( 126 self.algorithm, self.mode 127 ) 128 return self._wrap_ctx(ctx, encrypt=True) 129 130 def decryptor(self): 131 ctx = self._backend.create_symmetric_decryption_ctx( 132 self.algorithm, self.mode 133 ) 134 return self._wrap_ctx(ctx, encrypt=False) 135 136 def _wrap_ctx(self, ctx, encrypt): 137 if isinstance(self.mode, modes.ModeWithAuthenticationTag): 138 if encrypt: 139 return _AEADEncryptionContext(ctx) 140 else: 141 return _AEADCipherContext(ctx) 142 else: 143 return _CipherContext(ctx) 144 145 146@utils.register_interface(CipherContext) 147class _CipherContext(object): 148 def __init__(self, ctx): 149 self._ctx = ctx 150 151 def update(self, data): 152 if self._ctx is None: 153 raise AlreadyFinalized("Context was already finalized.") 154 return self._ctx.update(data) 155 156 def update_into(self, data, buf): 157 if self._ctx is None: 158 raise AlreadyFinalized("Context was already finalized.") 159 return self._ctx.update_into(data, buf) 160 161 def finalize(self): 162 if self._ctx is None: 163 raise AlreadyFinalized("Context was already finalized.") 164 data = self._ctx.finalize() 165 self._ctx = None 166 return data 167 168 169@utils.register_interface(AEADCipherContext) 170@utils.register_interface(CipherContext) 171@utils.register_interface(AEADDecryptionContext) 172class _AEADCipherContext(object): 173 def __init__(self, ctx): 174 self._ctx = ctx 175 self._bytes_processed = 0 176 self._aad_bytes_processed = 0 177 self._tag = None 178 self._updated = False 179 180 def _check_limit(self, data_size): 181 if self._ctx is None: 182 raise AlreadyFinalized("Context was already finalized.") 183 self._updated = True 184 self._bytes_processed += data_size 185 if self._bytes_processed > self._ctx._mode._MAX_ENCRYPTED_BYTES: 186 raise ValueError( 187 "{} has a maximum encrypted byte limit of {}".format( 188 self._ctx._mode.name, self._ctx._mode._MAX_ENCRYPTED_BYTES 189 ) 190 ) 191 192 def update(self, data): 193 self._check_limit(len(data)) 194 return self._ctx.update(data) 195 196 def update_into(self, data, buf): 197 self._check_limit(len(data)) 198 return self._ctx.update_into(data, buf) 199 200 def finalize(self): 201 if self._ctx is None: 202 raise AlreadyFinalized("Context was already finalized.") 203 data = self._ctx.finalize() 204 self._tag = self._ctx.tag 205 self._ctx = None 206 return data 207 208 def finalize_with_tag(self, tag): 209 if self._ctx is None: 210 raise AlreadyFinalized("Context was already finalized.") 211 data = self._ctx.finalize_with_tag(tag) 212 self._tag = self._ctx.tag 213 self._ctx = None 214 return data 215 216 def authenticate_additional_data(self, data): 217 if self._ctx is None: 218 raise AlreadyFinalized("Context was already finalized.") 219 if self._updated: 220 raise AlreadyUpdated("Update has been called on this context.") 221 222 self._aad_bytes_processed += len(data) 223 if self._aad_bytes_processed > self._ctx._mode._MAX_AAD_BYTES: 224 raise ValueError( 225 "{} has a maximum AAD byte limit of {}".format( 226 self._ctx._mode.name, self._ctx._mode._MAX_AAD_BYTES 227 ) 228 ) 229 230 self._ctx.authenticate_additional_data(data) 231 232 233@utils.register_interface(AEADEncryptionContext) 234class _AEADEncryptionContext(_AEADCipherContext): 235 @property 236 def tag(self): 237 if self._ctx is not None: 238 raise NotYetFinalized( 239 "You must finalize encryption before " "getting the tag." 240 ) 241 return self._tag 242