1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7import abc 8 9import six 10 11from cryptography import utils 12from cryptography.exceptions import AlreadyFinalized 13from cryptography.hazmat.bindings._padding import lib 14 15 16@six.add_metaclass(abc.ABCMeta) 17class PaddingContext(object): 18 @abc.abstractmethod 19 def update(self, data): 20 """ 21 Pads the provided bytes and returns any available data as bytes. 22 """ 23 24 @abc.abstractmethod 25 def finalize(self): 26 """ 27 Finalize the padding, returns bytes. 28 """ 29 30 31def _byte_padding_check(block_size): 32 if not (0 <= block_size <= 2040): 33 raise ValueError("block_size must be in range(0, 2041).") 34 35 if block_size % 8 != 0: 36 raise ValueError("block_size must be a multiple of 8.") 37 38 39def _byte_padding_update(buffer_, data, block_size): 40 if buffer_ is None: 41 raise AlreadyFinalized("Context was already finalized.") 42 43 utils._check_byteslike("data", data) 44 45 # six.PY2: Only coerce non-bytes objects to avoid triggering bad behavior 46 # of future's newbytes type. Unconditionally call bytes() after Python 2 47 # support is gone. 48 buffer_ += data if isinstance(data, bytes) else bytes(data) 49 50 finished_blocks = len(buffer_) // (block_size // 8) 51 52 result = buffer_[: finished_blocks * (block_size // 8)] 53 buffer_ = buffer_[finished_blocks * (block_size // 8) :] 54 55 return buffer_, result 56 57 58def _byte_padding_pad(buffer_, block_size, paddingfn): 59 if buffer_ is None: 60 raise AlreadyFinalized("Context was already finalized.") 61 62 pad_size = block_size // 8 - len(buffer_) 63 return buffer_ + paddingfn(pad_size) 64 65 66def _byte_unpadding_update(buffer_, data, block_size): 67 if buffer_ is None: 68 raise AlreadyFinalized("Context was already finalized.") 69 70 utils._check_byteslike("data", data) 71 72 # six.PY2: Only coerce non-bytes objects to avoid triggering bad behavior 73 # of future's newbytes type. Unconditionally call bytes() after Python 2 74 # support is gone. 75 buffer_ += data if isinstance(data, bytes) else bytes(data) 76 77 finished_blocks = max(len(buffer_) // (block_size // 8) - 1, 0) 78 79 result = buffer_[: finished_blocks * (block_size // 8)] 80 buffer_ = buffer_[finished_blocks * (block_size // 8) :] 81 82 return buffer_, result 83 84 85def _byte_unpadding_check(buffer_, block_size, checkfn): 86 if buffer_ is None: 87 raise AlreadyFinalized("Context was already finalized.") 88 89 if len(buffer_) != block_size // 8: 90 raise ValueError("Invalid padding bytes.") 91 92 valid = checkfn(buffer_, block_size // 8) 93 94 if not valid: 95 raise ValueError("Invalid padding bytes.") 96 97 pad_size = six.indexbytes(buffer_, -1) 98 return buffer_[:-pad_size] 99 100 101class PKCS7(object): 102 def __init__(self, block_size): 103 _byte_padding_check(block_size) 104 self.block_size = block_size 105 106 def padder(self): 107 return _PKCS7PaddingContext(self.block_size) 108 109 def unpadder(self): 110 return _PKCS7UnpaddingContext(self.block_size) 111 112 113@utils.register_interface(PaddingContext) 114class _PKCS7PaddingContext(object): 115 def __init__(self, block_size): 116 self.block_size = block_size 117 # TODO: more copies than necessary, we should use zero-buffer (#193) 118 self._buffer = b"" 119 120 def update(self, data): 121 self._buffer, result = _byte_padding_update( 122 self._buffer, data, self.block_size 123 ) 124 return result 125 126 def _padding(self, size): 127 return six.int2byte(size) * size 128 129 def finalize(self): 130 result = _byte_padding_pad( 131 self._buffer, self.block_size, self._padding 132 ) 133 self._buffer = None 134 return result 135 136 137@utils.register_interface(PaddingContext) 138class _PKCS7UnpaddingContext(object): 139 def __init__(self, block_size): 140 self.block_size = block_size 141 # TODO: more copies than necessary, we should use zero-buffer (#193) 142 self._buffer = b"" 143 144 def update(self, data): 145 self._buffer, result = _byte_unpadding_update( 146 self._buffer, data, self.block_size 147 ) 148 return result 149 150 def finalize(self): 151 result = _byte_unpadding_check( 152 self._buffer, self.block_size, lib.Cryptography_check_pkcs7_padding 153 ) 154 self._buffer = None 155 return result 156 157 158class ANSIX923(object): 159 def __init__(self, block_size): 160 _byte_padding_check(block_size) 161 self.block_size = block_size 162 163 def padder(self): 164 return _ANSIX923PaddingContext(self.block_size) 165 166 def unpadder(self): 167 return _ANSIX923UnpaddingContext(self.block_size) 168 169 170@utils.register_interface(PaddingContext) 171class _ANSIX923PaddingContext(object): 172 def __init__(self, block_size): 173 self.block_size = block_size 174 # TODO: more copies than necessary, we should use zero-buffer (#193) 175 self._buffer = b"" 176 177 def update(self, data): 178 self._buffer, result = _byte_padding_update( 179 self._buffer, data, self.block_size 180 ) 181 return result 182 183 def _padding(self, size): 184 return six.int2byte(0) * (size - 1) + six.int2byte(size) 185 186 def finalize(self): 187 result = _byte_padding_pad( 188 self._buffer, self.block_size, self._padding 189 ) 190 self._buffer = None 191 return result 192 193 194@utils.register_interface(PaddingContext) 195class _ANSIX923UnpaddingContext(object): 196 def __init__(self, block_size): 197 self.block_size = block_size 198 # TODO: more copies than necessary, we should use zero-buffer (#193) 199 self._buffer = b"" 200 201 def update(self, data): 202 self._buffer, result = _byte_unpadding_update( 203 self._buffer, data, self.block_size 204 ) 205 return result 206 207 def finalize(self): 208 result = _byte_unpadding_check( 209 self._buffer, 210 self.block_size, 211 lib.Cryptography_check_ansix923_padding, 212 ) 213 self._buffer = None 214 return result 215