• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import absolute_import, division, print_function
6
7
8from cryptography import utils
9from cryptography.exceptions import (
10    InvalidSignature, UnsupportedAlgorithm, _Reasons
11)
12from cryptography.hazmat.primitives import constant_time, mac
13from cryptography.hazmat.primitives.ciphers.modes import CBC
14
15
16@utils.register_interface(mac.MACContext)
17class _CMACContext(object):
18    def __init__(self, backend, algorithm, ctx=None):
19        if not backend.cmac_algorithm_supported(algorithm):
20            raise UnsupportedAlgorithm("This backend does not support CMAC.",
21                                       _Reasons.UNSUPPORTED_CIPHER)
22
23        self._backend = backend
24        self._key = algorithm.key
25        self._algorithm = algorithm
26        self._output_length = algorithm.block_size // 8
27
28        if ctx is None:
29            registry = self._backend._cipher_registry
30            adapter = registry[type(algorithm), CBC]
31
32            evp_cipher = adapter(self._backend, algorithm, CBC)
33
34            ctx = self._backend._lib.CMAC_CTX_new()
35
36            self._backend.openssl_assert(ctx != self._backend._ffi.NULL)
37            ctx = self._backend._ffi.gc(ctx, self._backend._lib.CMAC_CTX_free)
38
39            key_ptr = self._backend._ffi.from_buffer(self._key)
40            res = self._backend._lib.CMAC_Init(
41                ctx, key_ptr, len(self._key),
42                evp_cipher, self._backend._ffi.NULL
43            )
44            self._backend.openssl_assert(res == 1)
45
46        self._ctx = ctx
47
48    algorithm = utils.read_only_property("_algorithm")
49
50    def update(self, data):
51        res = self._backend._lib.CMAC_Update(self._ctx, data, len(data))
52        self._backend.openssl_assert(res == 1)
53
54    def finalize(self):
55        buf = self._backend._ffi.new("unsigned char[]", self._output_length)
56        length = self._backend._ffi.new("size_t *", self._output_length)
57        res = self._backend._lib.CMAC_Final(
58            self._ctx, buf, length
59        )
60        self._backend.openssl_assert(res == 1)
61
62        self._ctx = None
63
64        return self._backend._ffi.buffer(buf)[:]
65
66    def copy(self):
67        copied_ctx = self._backend._lib.CMAC_CTX_new()
68        copied_ctx = self._backend._ffi.gc(
69            copied_ctx, self._backend._lib.CMAC_CTX_free
70        )
71        res = self._backend._lib.CMAC_CTX_copy(
72            copied_ctx, self._ctx
73        )
74        self._backend.openssl_assert(res == 1)
75        return _CMACContext(
76            self._backend, self._algorithm, ctx=copied_ctx
77        )
78
79    def verify(self, signature):
80        digest = self.finalize()
81        if not constant_time.bytes_eq(digest, signature):
82            raise InvalidSignature("Signature did not match digest.")
83