1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7 8from cryptography.exceptions import InvalidSignature 9from cryptography.hazmat.primitives import constant_time 10 11 12_POLY1305_TAG_SIZE = 16 13_POLY1305_KEY_SIZE = 32 14 15 16class _Poly1305Context(object): 17 def __init__(self, backend, key): 18 self._backend = backend 19 20 key_ptr = self._backend._ffi.from_buffer(key) 21 # This function copies the key into OpenSSL-owned memory so we don't 22 # need to retain it ourselves 23 evp_pkey = self._backend._lib.EVP_PKEY_new_raw_private_key( 24 self._backend._lib.NID_poly1305, 25 self._backend._ffi.NULL, 26 key_ptr, 27 len(key), 28 ) 29 self._backend.openssl_assert(evp_pkey != self._backend._ffi.NULL) 30 self._evp_pkey = self._backend._ffi.gc( 31 evp_pkey, self._backend._lib.EVP_PKEY_free 32 ) 33 ctx = self._backend._lib.EVP_MD_CTX_new() 34 self._backend.openssl_assert(ctx != self._backend._ffi.NULL) 35 self._ctx = self._backend._ffi.gc( 36 ctx, self._backend._lib.EVP_MD_CTX_free 37 ) 38 res = self._backend._lib.EVP_DigestSignInit( 39 self._ctx, 40 self._backend._ffi.NULL, 41 self._backend._ffi.NULL, 42 self._backend._ffi.NULL, 43 self._evp_pkey, 44 ) 45 self._backend.openssl_assert(res == 1) 46 47 def update(self, data): 48 data_ptr = self._backend._ffi.from_buffer(data) 49 res = self._backend._lib.EVP_DigestSignUpdate( 50 self._ctx, data_ptr, len(data) 51 ) 52 self._backend.openssl_assert(res != 0) 53 54 def finalize(self): 55 buf = self._backend._ffi.new("unsigned char[]", _POLY1305_TAG_SIZE) 56 outlen = self._backend._ffi.new("size_t *") 57 res = self._backend._lib.EVP_DigestSignFinal(self._ctx, buf, outlen) 58 self._backend.openssl_assert(res != 0) 59 self._backend.openssl_assert(outlen[0] == _POLY1305_TAG_SIZE) 60 return self._backend._ffi.buffer(buf)[: outlen[0]] 61 62 def verify(self, tag): 63 mac = self.finalize() 64 if not constant_time.bytes_eq(mac, tag): 65 raise InvalidSignature("Value did not match computed tag.") 66