1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7 8from cryptography import utils 9from cryptography.exceptions import ( 10 InvalidSignature, UnsupportedAlgorithm, _Reasons 11) 12from cryptography.hazmat.primitives import constant_time, mac 13from cryptography.hazmat.primitives.ciphers.modes import CBC 14 15 16@utils.register_interface(mac.MACContext) 17class _CMACContext(object): 18 def __init__(self, backend, algorithm, ctx=None): 19 if not backend.cmac_algorithm_supported(algorithm): 20 raise UnsupportedAlgorithm("This backend does not support CMAC.", 21 _Reasons.UNSUPPORTED_CIPHER) 22 23 self._backend = backend 24 self._key = algorithm.key 25 self._algorithm = algorithm 26 self._output_length = algorithm.block_size // 8 27 28 if ctx is None: 29 registry = self._backend._cipher_registry 30 adapter = registry[type(algorithm), CBC] 31 32 evp_cipher = adapter(self._backend, algorithm, CBC) 33 34 ctx = self._backend._lib.CMAC_CTX_new() 35 36 self._backend.openssl_assert(ctx != self._backend._ffi.NULL) 37 ctx = self._backend._ffi.gc(ctx, self._backend._lib.CMAC_CTX_free) 38 39 key_ptr = self._backend._ffi.from_buffer(self._key) 40 res = self._backend._lib.CMAC_Init( 41 ctx, key_ptr, len(self._key), 42 evp_cipher, self._backend._ffi.NULL 43 ) 44 self._backend.openssl_assert(res == 1) 45 46 self._ctx = ctx 47 48 algorithm = utils.read_only_property("_algorithm") 49 50 def update(self, data): 51 res = self._backend._lib.CMAC_Update(self._ctx, data, len(data)) 52 self._backend.openssl_assert(res == 1) 53 54 def finalize(self): 55 buf = self._backend._ffi.new("unsigned char[]", self._output_length) 56 length = self._backend._ffi.new("size_t *", self._output_length) 57 res = self._backend._lib.CMAC_Final( 58 self._ctx, buf, length 59 ) 60 self._backend.openssl_assert(res == 1) 61 62 self._ctx = None 63 64 return self._backend._ffi.buffer(buf)[:] 65 66 def copy(self): 67 copied_ctx = self._backend._lib.CMAC_CTX_new() 68 copied_ctx = self._backend._ffi.gc( 69 copied_ctx, self._backend._lib.CMAC_CTX_free 70 ) 71 res = self._backend._lib.CMAC_CTX_copy( 72 copied_ctx, self._ctx 73 ) 74 self._backend.openssl_assert(res == 1) 75 return _CMACContext( 76 self._backend, self._algorithm, ctx=copied_ctx 77 ) 78 79 def verify(self, signature): 80 digest = self.finalize() 81 if not constant_time.bytes_eq(digest, signature): 82 raise InvalidSignature("Signature did not match digest.") 83