• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import absolute_import, division, print_function
6
7from cryptography import utils
8from cryptography.exceptions import (
9    InvalidSignature,
10    UnsupportedAlgorithm,
11    _Reasons,
12)
13from cryptography.hazmat.backends.openssl.utils import (
14    _calculate_digest_and_algorithm,
15    _check_not_prehashed,
16    _warn_sign_verify_deprecated,
17)
18from cryptography.hazmat.primitives import hashes, serialization
19from cryptography.hazmat.primitives.asymmetric import (
20    AsymmetricSignatureContext,
21    AsymmetricVerificationContext,
22    ec,
23)
24
25
26def _check_signature_algorithm(signature_algorithm):
27    if not isinstance(signature_algorithm, ec.ECDSA):
28        raise UnsupportedAlgorithm(
29            "Unsupported elliptic curve signature algorithm.",
30            _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM,
31        )
32
33
34def _ec_key_curve_sn(backend, ec_key):
35    group = backend._lib.EC_KEY_get0_group(ec_key)
36    backend.openssl_assert(group != backend._ffi.NULL)
37
38    nid = backend._lib.EC_GROUP_get_curve_name(group)
39    # The following check is to find EC keys with unnamed curves and raise
40    # an error for now.
41    if nid == backend._lib.NID_undef:
42        raise NotImplementedError(
43            "ECDSA keys with unnamed curves are unsupported at this time"
44        )
45
46    # This is like the above check, but it also catches the case where you
47    # explicitly encoded a curve with the same parameters as a named curve.
48    # Don't do that.
49    if (
50        not backend._lib.CRYPTOGRAPHY_IS_LIBRESSL
51        and backend._lib.EC_GROUP_get_asn1_flag(group) == 0
52    ):
53        raise NotImplementedError(
54            "ECDSA keys with unnamed curves are unsupported at this time"
55        )
56
57    curve_name = backend._lib.OBJ_nid2sn(nid)
58    backend.openssl_assert(curve_name != backend._ffi.NULL)
59
60    sn = backend._ffi.string(curve_name).decode("ascii")
61    return sn
62
63
64def _mark_asn1_named_ec_curve(backend, ec_cdata):
65    """
66    Set the named curve flag on the EC_KEY. This causes OpenSSL to
67    serialize EC keys along with their curve OID which makes
68    deserialization easier.
69    """
70
71    backend._lib.EC_KEY_set_asn1_flag(
72        ec_cdata, backend._lib.OPENSSL_EC_NAMED_CURVE
73    )
74
75
76def _sn_to_elliptic_curve(backend, sn):
77    try:
78        return ec._CURVE_TYPES[sn]()
79    except KeyError:
80        raise UnsupportedAlgorithm(
81            "{} is not a supported elliptic curve".format(sn),
82            _Reasons.UNSUPPORTED_ELLIPTIC_CURVE,
83        )
84
85
86def _ecdsa_sig_sign(backend, private_key, data):
87    max_size = backend._lib.ECDSA_size(private_key._ec_key)
88    backend.openssl_assert(max_size > 0)
89
90    sigbuf = backend._ffi.new("unsigned char[]", max_size)
91    siglen_ptr = backend._ffi.new("unsigned int[]", 1)
92    res = backend._lib.ECDSA_sign(
93        0, data, len(data), sigbuf, siglen_ptr, private_key._ec_key
94    )
95    backend.openssl_assert(res == 1)
96    return backend._ffi.buffer(sigbuf)[: siglen_ptr[0]]
97
98
99def _ecdsa_sig_verify(backend, public_key, signature, data):
100    res = backend._lib.ECDSA_verify(
101        0, data, len(data), signature, len(signature), public_key._ec_key
102    )
103    if res != 1:
104        backend._consume_errors()
105        raise InvalidSignature
106
107
108@utils.register_interface(AsymmetricSignatureContext)
109class _ECDSASignatureContext(object):
110    def __init__(self, backend, private_key, algorithm):
111        self._backend = backend
112        self._private_key = private_key
113        self._digest = hashes.Hash(algorithm, backend)
114
115    def update(self, data):
116        self._digest.update(data)
117
118    def finalize(self):
119        digest = self._digest.finalize()
120
121        return _ecdsa_sig_sign(self._backend, self._private_key, digest)
122
123
124@utils.register_interface(AsymmetricVerificationContext)
125class _ECDSAVerificationContext(object):
126    def __init__(self, backend, public_key, signature, algorithm):
127        self._backend = backend
128        self._public_key = public_key
129        self._signature = signature
130        self._digest = hashes.Hash(algorithm, backend)
131
132    def update(self, data):
133        self._digest.update(data)
134
135    def verify(self):
136        digest = self._digest.finalize()
137        _ecdsa_sig_verify(
138            self._backend, self._public_key, self._signature, digest
139        )
140
141
142@utils.register_interface(ec.EllipticCurvePrivateKeyWithSerialization)
143class _EllipticCurvePrivateKey(object):
144    def __init__(self, backend, ec_key_cdata, evp_pkey):
145        self._backend = backend
146        self._ec_key = ec_key_cdata
147        self._evp_pkey = evp_pkey
148
149        sn = _ec_key_curve_sn(backend, ec_key_cdata)
150        self._curve = _sn_to_elliptic_curve(backend, sn)
151        _mark_asn1_named_ec_curve(backend, ec_key_cdata)
152
153    curve = utils.read_only_property("_curve")
154
155    @property
156    def key_size(self):
157        return self.curve.key_size
158
159    def signer(self, signature_algorithm):
160        _warn_sign_verify_deprecated()
161        _check_signature_algorithm(signature_algorithm)
162        _check_not_prehashed(signature_algorithm.algorithm)
163        return _ECDSASignatureContext(
164            self._backend, self, signature_algorithm.algorithm
165        )
166
167    def exchange(self, algorithm, peer_public_key):
168        if not (
169            self._backend.elliptic_curve_exchange_algorithm_supported(
170                algorithm, self.curve
171            )
172        ):
173            raise UnsupportedAlgorithm(
174                "This backend does not support the ECDH algorithm.",
175                _Reasons.UNSUPPORTED_EXCHANGE_ALGORITHM,
176            )
177
178        if peer_public_key.curve.name != self.curve.name:
179            raise ValueError(
180                "peer_public_key and self are not on the same curve"
181            )
182
183        group = self._backend._lib.EC_KEY_get0_group(self._ec_key)
184        z_len = (self._backend._lib.EC_GROUP_get_degree(group) + 7) // 8
185        self._backend.openssl_assert(z_len > 0)
186        z_buf = self._backend._ffi.new("uint8_t[]", z_len)
187        peer_key = self._backend._lib.EC_KEY_get0_public_key(
188            peer_public_key._ec_key
189        )
190
191        r = self._backend._lib.ECDH_compute_key(
192            z_buf, z_len, peer_key, self._ec_key, self._backend._ffi.NULL
193        )
194        self._backend.openssl_assert(r > 0)
195        return self._backend._ffi.buffer(z_buf)[:z_len]
196
197    def public_key(self):
198        group = self._backend._lib.EC_KEY_get0_group(self._ec_key)
199        self._backend.openssl_assert(group != self._backend._ffi.NULL)
200
201        curve_nid = self._backend._lib.EC_GROUP_get_curve_name(group)
202        public_ec_key = self._backend._ec_key_new_by_curve_nid(curve_nid)
203
204        point = self._backend._lib.EC_KEY_get0_public_key(self._ec_key)
205        self._backend.openssl_assert(point != self._backend._ffi.NULL)
206
207        res = self._backend._lib.EC_KEY_set_public_key(public_ec_key, point)
208        self._backend.openssl_assert(res == 1)
209
210        evp_pkey = self._backend._ec_cdata_to_evp_pkey(public_ec_key)
211
212        return _EllipticCurvePublicKey(self._backend, public_ec_key, evp_pkey)
213
214    def private_numbers(self):
215        bn = self._backend._lib.EC_KEY_get0_private_key(self._ec_key)
216        private_value = self._backend._bn_to_int(bn)
217        return ec.EllipticCurvePrivateNumbers(
218            private_value=private_value,
219            public_numbers=self.public_key().public_numbers(),
220        )
221
222    def private_bytes(self, encoding, format, encryption_algorithm):
223        return self._backend._private_key_bytes(
224            encoding,
225            format,
226            encryption_algorithm,
227            self,
228            self._evp_pkey,
229            self._ec_key,
230        )
231
232    def sign(self, data, signature_algorithm):
233        _check_signature_algorithm(signature_algorithm)
234        data, algorithm = _calculate_digest_and_algorithm(
235            self._backend, data, signature_algorithm._algorithm
236        )
237        return _ecdsa_sig_sign(self._backend, self, data)
238
239
240@utils.register_interface(ec.EllipticCurvePublicKeyWithSerialization)
241class _EllipticCurvePublicKey(object):
242    def __init__(self, backend, ec_key_cdata, evp_pkey):
243        self._backend = backend
244        self._ec_key = ec_key_cdata
245        self._evp_pkey = evp_pkey
246
247        sn = _ec_key_curve_sn(backend, ec_key_cdata)
248        self._curve = _sn_to_elliptic_curve(backend, sn)
249        _mark_asn1_named_ec_curve(backend, ec_key_cdata)
250
251    curve = utils.read_only_property("_curve")
252
253    @property
254    def key_size(self):
255        return self.curve.key_size
256
257    def verifier(self, signature, signature_algorithm):
258        _warn_sign_verify_deprecated()
259        utils._check_bytes("signature", signature)
260
261        _check_signature_algorithm(signature_algorithm)
262        _check_not_prehashed(signature_algorithm.algorithm)
263        return _ECDSAVerificationContext(
264            self._backend, self, signature, signature_algorithm.algorithm
265        )
266
267    def public_numbers(self):
268        get_func, group = self._backend._ec_key_determine_group_get_func(
269            self._ec_key
270        )
271        point = self._backend._lib.EC_KEY_get0_public_key(self._ec_key)
272        self._backend.openssl_assert(point != self._backend._ffi.NULL)
273
274        with self._backend._tmp_bn_ctx() as bn_ctx:
275            bn_x = self._backend._lib.BN_CTX_get(bn_ctx)
276            bn_y = self._backend._lib.BN_CTX_get(bn_ctx)
277
278            res = get_func(group, point, bn_x, bn_y, bn_ctx)
279            self._backend.openssl_assert(res == 1)
280
281            x = self._backend._bn_to_int(bn_x)
282            y = self._backend._bn_to_int(bn_y)
283
284        return ec.EllipticCurvePublicNumbers(x=x, y=y, curve=self._curve)
285
286    def _encode_point(self, format):
287        if format is serialization.PublicFormat.CompressedPoint:
288            conversion = self._backend._lib.POINT_CONVERSION_COMPRESSED
289        else:
290            assert format is serialization.PublicFormat.UncompressedPoint
291            conversion = self._backend._lib.POINT_CONVERSION_UNCOMPRESSED
292
293        group = self._backend._lib.EC_KEY_get0_group(self._ec_key)
294        self._backend.openssl_assert(group != self._backend._ffi.NULL)
295        point = self._backend._lib.EC_KEY_get0_public_key(self._ec_key)
296        self._backend.openssl_assert(point != self._backend._ffi.NULL)
297        with self._backend._tmp_bn_ctx() as bn_ctx:
298            buflen = self._backend._lib.EC_POINT_point2oct(
299                group, point, conversion, self._backend._ffi.NULL, 0, bn_ctx
300            )
301            self._backend.openssl_assert(buflen > 0)
302            buf = self._backend._ffi.new("char[]", buflen)
303            res = self._backend._lib.EC_POINT_point2oct(
304                group, point, conversion, buf, buflen, bn_ctx
305            )
306            self._backend.openssl_assert(buflen == res)
307
308        return self._backend._ffi.buffer(buf)[:]
309
310    def public_bytes(self, encoding, format):
311
312        if (
313            encoding is serialization.Encoding.X962
314            or format is serialization.PublicFormat.CompressedPoint
315            or format is serialization.PublicFormat.UncompressedPoint
316        ):
317            if encoding is not serialization.Encoding.X962 or format not in (
318                serialization.PublicFormat.CompressedPoint,
319                serialization.PublicFormat.UncompressedPoint,
320            ):
321                raise ValueError(
322                    "X962 encoding must be used with CompressedPoint or "
323                    "UncompressedPoint format"
324                )
325
326            return self._encode_point(format)
327        else:
328            return self._backend._public_key_bytes(
329                encoding, format, self, self._evp_pkey, None
330            )
331
332    def verify(self, signature, data, signature_algorithm):
333        _check_signature_algorithm(signature_algorithm)
334        data, algorithm = _calculate_digest_and_algorithm(
335            self._backend, data, signature_algorithm._algorithm
336        )
337        _ecdsa_sig_verify(self._backend, self, signature, data)
338