1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7 8from cryptography import utils 9from cryptography.exceptions import ( 10 InvalidSignature, UnsupportedAlgorithm, _Reasons 11) 12from cryptography.hazmat.primitives import constant_time, hashes, mac 13 14 15@utils.register_interface(mac.MACContext) 16@utils.register_interface(hashes.HashContext) 17class _HMACContext(object): 18 def __init__(self, backend, key, algorithm, ctx=None): 19 self._algorithm = algorithm 20 self._backend = backend 21 22 if ctx is None: 23 ctx = self._backend._lib.Cryptography_HMAC_CTX_new() 24 self._backend.openssl_assert(ctx != self._backend._ffi.NULL) 25 ctx = self._backend._ffi.gc( 26 ctx, self._backend._lib.Cryptography_HMAC_CTX_free 27 ) 28 evp_md = self._backend._evp_md_from_algorithm(algorithm) 29 if evp_md == self._backend._ffi.NULL: 30 raise UnsupportedAlgorithm( 31 "{0} is not a supported hash on this backend".format( 32 algorithm.name), 33 _Reasons.UNSUPPORTED_HASH 34 ) 35 key_ptr = self._backend._ffi.from_buffer(key) 36 res = self._backend._lib.HMAC_Init_ex( 37 ctx, key_ptr, len(key), evp_md, self._backend._ffi.NULL 38 ) 39 self._backend.openssl_assert(res != 0) 40 41 self._ctx = ctx 42 self._key = key 43 44 algorithm = utils.read_only_property("_algorithm") 45 46 def copy(self): 47 copied_ctx = self._backend._lib.Cryptography_HMAC_CTX_new() 48 self._backend.openssl_assert(copied_ctx != self._backend._ffi.NULL) 49 copied_ctx = self._backend._ffi.gc( 50 copied_ctx, self._backend._lib.Cryptography_HMAC_CTX_free 51 ) 52 res = self._backend._lib.HMAC_CTX_copy(copied_ctx, self._ctx) 53 self._backend.openssl_assert(res != 0) 54 return _HMACContext( 55 self._backend, self._key, self.algorithm, ctx=copied_ctx 56 ) 57 58 def update(self, data): 59 data_ptr = self._backend._ffi.from_buffer(data) 60 res = self._backend._lib.HMAC_Update(self._ctx, data_ptr, len(data)) 61 self._backend.openssl_assert(res != 0) 62 63 def finalize(self): 64 buf = self._backend._ffi.new("unsigned char[]", 65 self._backend._lib.EVP_MAX_MD_SIZE) 66 outlen = self._backend._ffi.new("unsigned int *") 67 res = self._backend._lib.HMAC_Final(self._ctx, buf, outlen) 68 self._backend.openssl_assert(res != 0) 69 self._backend.openssl_assert(outlen[0] == self.algorithm.digest_size) 70 return self._backend._ffi.buffer(buf)[:outlen[0]] 71 72 def verify(self, signature): 73 digest = self.finalize() 74 if not constant_time.bytes_eq(digest, signature): 75 raise InvalidSignature("Signature did not match digest.") 76