1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7import abc 8 9import six 10 11from cryptography import utils 12from cryptography.exceptions import AlreadyFinalized 13from cryptography.hazmat.bindings._padding import lib 14 15 16@six.add_metaclass(abc.ABCMeta) 17class PaddingContext(object): 18 @abc.abstractmethod 19 def update(self, data): 20 """ 21 Pads the provided bytes and returns any available data as bytes. 22 """ 23 24 @abc.abstractmethod 25 def finalize(self): 26 """ 27 Finalize the padding, returns bytes. 28 """ 29 30 31def _byte_padding_check(block_size): 32 if not (0 <= block_size <= 2040): 33 raise ValueError("block_size must be in range(0, 2041).") 34 35 if block_size % 8 != 0: 36 raise ValueError("block_size must be a multiple of 8.") 37 38 39def _byte_padding_update(buffer_, data, block_size): 40 if buffer_ is None: 41 raise AlreadyFinalized("Context was already finalized.") 42 43 utils._check_bytes("data", data) 44 45 buffer_ += data 46 47 finished_blocks = len(buffer_) // (block_size // 8) 48 49 result = buffer_[:finished_blocks * (block_size // 8)] 50 buffer_ = buffer_[finished_blocks * (block_size // 8):] 51 52 return buffer_, result 53 54 55def _byte_padding_pad(buffer_, block_size, paddingfn): 56 if buffer_ is None: 57 raise AlreadyFinalized("Context was already finalized.") 58 59 pad_size = block_size // 8 - len(buffer_) 60 return buffer_ + paddingfn(pad_size) 61 62 63def _byte_unpadding_update(buffer_, data, block_size): 64 if buffer_ is None: 65 raise AlreadyFinalized("Context was already finalized.") 66 67 utils._check_bytes("data", data) 68 69 buffer_ += data 70 71 finished_blocks = max(len(buffer_) // (block_size // 8) - 1, 0) 72 73 result = buffer_[:finished_blocks * (block_size // 8)] 74 buffer_ = buffer_[finished_blocks * (block_size // 8):] 75 76 return buffer_, result 77 78 79def _byte_unpadding_check(buffer_, block_size, checkfn): 80 if buffer_ is None: 81 raise AlreadyFinalized("Context was already finalized.") 82 83 if len(buffer_) != block_size // 8: 84 raise ValueError("Invalid padding bytes.") 85 86 valid = checkfn(buffer_, block_size // 8) 87 88 if not valid: 89 raise ValueError("Invalid padding bytes.") 90 91 pad_size = six.indexbytes(buffer_, -1) 92 return buffer_[:-pad_size] 93 94 95class PKCS7(object): 96 def __init__(self, block_size): 97 _byte_padding_check(block_size) 98 self.block_size = block_size 99 100 def padder(self): 101 return _PKCS7PaddingContext(self.block_size) 102 103 def unpadder(self): 104 return _PKCS7UnpaddingContext(self.block_size) 105 106 107@utils.register_interface(PaddingContext) 108class _PKCS7PaddingContext(object): 109 def __init__(self, block_size): 110 self.block_size = block_size 111 # TODO: more copies than necessary, we should use zero-buffer (#193) 112 self._buffer = b"" 113 114 def update(self, data): 115 self._buffer, result = _byte_padding_update( 116 self._buffer, data, self.block_size) 117 return result 118 119 def _padding(self, size): 120 return six.int2byte(size) * size 121 122 def finalize(self): 123 result = _byte_padding_pad( 124 self._buffer, self.block_size, self._padding) 125 self._buffer = None 126 return result 127 128 129@utils.register_interface(PaddingContext) 130class _PKCS7UnpaddingContext(object): 131 def __init__(self, block_size): 132 self.block_size = block_size 133 # TODO: more copies than necessary, we should use zero-buffer (#193) 134 self._buffer = b"" 135 136 def update(self, data): 137 self._buffer, result = _byte_unpadding_update( 138 self._buffer, data, self.block_size) 139 return result 140 141 def finalize(self): 142 result = _byte_unpadding_check( 143 self._buffer, self.block_size, 144 lib.Cryptography_check_pkcs7_padding) 145 self._buffer = None 146 return result 147 148 149class ANSIX923(object): 150 def __init__(self, block_size): 151 _byte_padding_check(block_size) 152 self.block_size = block_size 153 154 def padder(self): 155 return _ANSIX923PaddingContext(self.block_size) 156 157 def unpadder(self): 158 return _ANSIX923UnpaddingContext(self.block_size) 159 160 161@utils.register_interface(PaddingContext) 162class _ANSIX923PaddingContext(object): 163 def __init__(self, block_size): 164 self.block_size = block_size 165 # TODO: more copies than necessary, we should use zero-buffer (#193) 166 self._buffer = b"" 167 168 def update(self, data): 169 self._buffer, result = _byte_padding_update( 170 self._buffer, data, self.block_size) 171 return result 172 173 def _padding(self, size): 174 return six.int2byte(0) * (size - 1) + six.int2byte(size) 175 176 def finalize(self): 177 result = _byte_padding_pad( 178 self._buffer, self.block_size, self._padding) 179 self._buffer = None 180 return result 181 182 183@utils.register_interface(PaddingContext) 184class _ANSIX923UnpaddingContext(object): 185 def __init__(self, block_size): 186 self.block_size = block_size 187 # TODO: more copies than necessary, we should use zero-buffer (#193) 188 self._buffer = b"" 189 190 def update(self, data): 191 self._buffer, result = _byte_unpadding_update( 192 self._buffer, data, self.block_size) 193 return result 194 195 def finalize(self): 196 result = _byte_unpadding_check( 197 self._buffer, self.block_size, 198 lib.Cryptography_check_ansix923_padding) 199 self._buffer = None 200 return result 201