1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7 8from cryptography import utils 9from cryptography.exceptions import ( 10 InvalidSignature, 11 UnsupportedAlgorithm, 12 _Reasons, 13) 14from cryptography.hazmat.primitives import constant_time, hashes 15 16 17@utils.register_interface(hashes.HashContext) 18class _HMACContext(object): 19 def __init__(self, backend, key, algorithm, ctx=None): 20 self._algorithm = algorithm 21 self._backend = backend 22 23 if ctx is None: 24 ctx = self._backend._lib.HMAC_CTX_new() 25 self._backend.openssl_assert(ctx != self._backend._ffi.NULL) 26 ctx = self._backend._ffi.gc(ctx, self._backend._lib.HMAC_CTX_free) 27 evp_md = self._backend._evp_md_from_algorithm(algorithm) 28 if evp_md == self._backend._ffi.NULL: 29 raise UnsupportedAlgorithm( 30 "{} is not a supported hash on this backend".format( 31 algorithm.name 32 ), 33 _Reasons.UNSUPPORTED_HASH, 34 ) 35 key_ptr = self._backend._ffi.from_buffer(key) 36 res = self._backend._lib.HMAC_Init_ex( 37 ctx, key_ptr, len(key), evp_md, self._backend._ffi.NULL 38 ) 39 self._backend.openssl_assert(res != 0) 40 41 self._ctx = ctx 42 self._key = key 43 44 algorithm = utils.read_only_property("_algorithm") 45 46 def copy(self): 47 copied_ctx = self._backend._lib.HMAC_CTX_new() 48 self._backend.openssl_assert(copied_ctx != self._backend._ffi.NULL) 49 copied_ctx = self._backend._ffi.gc( 50 copied_ctx, self._backend._lib.HMAC_CTX_free 51 ) 52 res = self._backend._lib.HMAC_CTX_copy(copied_ctx, self._ctx) 53 self._backend.openssl_assert(res != 0) 54 return _HMACContext( 55 self._backend, self._key, self.algorithm, ctx=copied_ctx 56 ) 57 58 def update(self, data): 59 data_ptr = self._backend._ffi.from_buffer(data) 60 res = self._backend._lib.HMAC_Update(self._ctx, data_ptr, len(data)) 61 self._backend.openssl_assert(res != 0) 62 63 def finalize(self): 64 buf = self._backend._ffi.new( 65 "unsigned char[]", self._backend._lib.EVP_MAX_MD_SIZE 66 ) 67 outlen = self._backend._ffi.new("unsigned int *") 68 res = self._backend._lib.HMAC_Final(self._ctx, buf, outlen) 69 self._backend.openssl_assert(res != 0) 70 self._backend.openssl_assert(outlen[0] == self.algorithm.digest_size) 71 return self._backend._ffi.buffer(buf)[: outlen[0]] 72 73 def verify(self, signature): 74 digest = self.finalize() 75 if not constant_time.bytes_eq(digest, signature): 76 raise InvalidSignature("Signature did not match digest.") 77