• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import absolute_import, division, print_function
6
7
8from cryptography import utils
9from cryptography.exceptions import (
10    InvalidSignature,
11    UnsupportedAlgorithm,
12    _Reasons,
13)
14from cryptography.hazmat.primitives import constant_time
15from cryptography.hazmat.primitives.ciphers.modes import CBC
16
17
18class _CMACContext(object):
19    def __init__(self, backend, algorithm, ctx=None):
20        if not backend.cmac_algorithm_supported(algorithm):
21            raise UnsupportedAlgorithm(
22                "This backend does not support CMAC.",
23                _Reasons.UNSUPPORTED_CIPHER,
24            )
25
26        self._backend = backend
27        self._key = algorithm.key
28        self._algorithm = algorithm
29        self._output_length = algorithm.block_size // 8
30
31        if ctx is None:
32            registry = self._backend._cipher_registry
33            adapter = registry[type(algorithm), CBC]
34
35            evp_cipher = adapter(self._backend, algorithm, CBC)
36
37            ctx = self._backend._lib.CMAC_CTX_new()
38
39            self._backend.openssl_assert(ctx != self._backend._ffi.NULL)
40            ctx = self._backend._ffi.gc(ctx, self._backend._lib.CMAC_CTX_free)
41
42            key_ptr = self._backend._ffi.from_buffer(self._key)
43            res = self._backend._lib.CMAC_Init(
44                ctx,
45                key_ptr,
46                len(self._key),
47                evp_cipher,
48                self._backend._ffi.NULL,
49            )
50            self._backend.openssl_assert(res == 1)
51
52        self._ctx = ctx
53
54    algorithm = utils.read_only_property("_algorithm")
55
56    def update(self, data):
57        res = self._backend._lib.CMAC_Update(self._ctx, data, len(data))
58        self._backend.openssl_assert(res == 1)
59
60    def finalize(self):
61        buf = self._backend._ffi.new("unsigned char[]", self._output_length)
62        length = self._backend._ffi.new("size_t *", self._output_length)
63        res = self._backend._lib.CMAC_Final(self._ctx, buf, length)
64        self._backend.openssl_assert(res == 1)
65
66        self._ctx = None
67
68        return self._backend._ffi.buffer(buf)[:]
69
70    def copy(self):
71        copied_ctx = self._backend._lib.CMAC_CTX_new()
72        copied_ctx = self._backend._ffi.gc(
73            copied_ctx, self._backend._lib.CMAC_CTX_free
74        )
75        res = self._backend._lib.CMAC_CTX_copy(copied_ctx, self._ctx)
76        self._backend.openssl_assert(res == 1)
77        return _CMACContext(self._backend, self._algorithm, ctx=copied_ctx)
78
79    def verify(self, signature):
80        digest = self.finalize()
81        if not constant_time.bytes_eq(digest, signature):
82            raise InvalidSignature("Signature did not match digest.")
83