• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import absolute_import, division, print_function
6
7from cryptography import utils
8from cryptography.exceptions import (
9    InvalidSignature, UnsupportedAlgorithm, _Reasons
10)
11from cryptography.hazmat.backends.openssl.utils import (
12    _calculate_digest_and_algorithm, _check_not_prehashed,
13    _warn_sign_verify_deprecated
14)
15from cryptography.hazmat.primitives import hashes, serialization
16from cryptography.hazmat.primitives.asymmetric import (
17    AsymmetricSignatureContext, AsymmetricVerificationContext, ec
18)
19
20
21def _check_signature_algorithm(signature_algorithm):
22    if not isinstance(signature_algorithm, ec.ECDSA):
23        raise UnsupportedAlgorithm(
24            "Unsupported elliptic curve signature algorithm.",
25            _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM)
26
27
28def _ec_key_curve_sn(backend, ec_key):
29    group = backend._lib.EC_KEY_get0_group(ec_key)
30    backend.openssl_assert(group != backend._ffi.NULL)
31
32    nid = backend._lib.EC_GROUP_get_curve_name(group)
33    # The following check is to find EC keys with unnamed curves and raise
34    # an error for now.
35    if nid == backend._lib.NID_undef:
36        raise NotImplementedError(
37            "ECDSA certificates with unnamed curves are unsupported "
38            "at this time"
39        )
40
41    curve_name = backend._lib.OBJ_nid2sn(nid)
42    backend.openssl_assert(curve_name != backend._ffi.NULL)
43
44    sn = backend._ffi.string(curve_name).decode('ascii')
45    return sn
46
47
48def _mark_asn1_named_ec_curve(backend, ec_cdata):
49    """
50    Set the named curve flag on the EC_KEY. This causes OpenSSL to
51    serialize EC keys along with their curve OID which makes
52    deserialization easier.
53    """
54
55    backend._lib.EC_KEY_set_asn1_flag(
56        ec_cdata, backend._lib.OPENSSL_EC_NAMED_CURVE
57    )
58
59
60def _sn_to_elliptic_curve(backend, sn):
61    try:
62        return ec._CURVE_TYPES[sn]()
63    except KeyError:
64        raise UnsupportedAlgorithm(
65            "{0} is not a supported elliptic curve".format(sn),
66            _Reasons.UNSUPPORTED_ELLIPTIC_CURVE
67        )
68
69
70def _ecdsa_sig_sign(backend, private_key, data):
71    max_size = backend._lib.ECDSA_size(private_key._ec_key)
72    backend.openssl_assert(max_size > 0)
73
74    sigbuf = backend._ffi.new("unsigned char[]", max_size)
75    siglen_ptr = backend._ffi.new("unsigned int[]", 1)
76    res = backend._lib.ECDSA_sign(
77        0, data, len(data), sigbuf, siglen_ptr, private_key._ec_key
78    )
79    backend.openssl_assert(res == 1)
80    return backend._ffi.buffer(sigbuf)[:siglen_ptr[0]]
81
82
83def _ecdsa_sig_verify(backend, public_key, signature, data):
84    res = backend._lib.ECDSA_verify(
85        0, data, len(data), signature, len(signature), public_key._ec_key
86    )
87    if res != 1:
88        backend._consume_errors()
89        raise InvalidSignature
90
91
92@utils.register_interface(AsymmetricSignatureContext)
93class _ECDSASignatureContext(object):
94    def __init__(self, backend, private_key, algorithm):
95        self._backend = backend
96        self._private_key = private_key
97        self._digest = hashes.Hash(algorithm, backend)
98
99    def update(self, data):
100        self._digest.update(data)
101
102    def finalize(self):
103        digest = self._digest.finalize()
104
105        return _ecdsa_sig_sign(self._backend, self._private_key, digest)
106
107
108@utils.register_interface(AsymmetricVerificationContext)
109class _ECDSAVerificationContext(object):
110    def __init__(self, backend, public_key, signature, algorithm):
111        self._backend = backend
112        self._public_key = public_key
113        self._signature = signature
114        self._digest = hashes.Hash(algorithm, backend)
115
116    def update(self, data):
117        self._digest.update(data)
118
119    def verify(self):
120        digest = self._digest.finalize()
121        _ecdsa_sig_verify(
122            self._backend, self._public_key, self._signature, digest
123        )
124
125
126@utils.register_interface(ec.EllipticCurvePrivateKeyWithSerialization)
127class _EllipticCurvePrivateKey(object):
128    def __init__(self, backend, ec_key_cdata, evp_pkey):
129        self._backend = backend
130        _mark_asn1_named_ec_curve(backend, ec_key_cdata)
131        self._ec_key = ec_key_cdata
132        self._evp_pkey = evp_pkey
133
134        sn = _ec_key_curve_sn(backend, ec_key_cdata)
135        self._curve = _sn_to_elliptic_curve(backend, sn)
136
137    curve = utils.read_only_property("_curve")
138
139    @property
140    def key_size(self):
141        return self.curve.key_size
142
143    def signer(self, signature_algorithm):
144        _warn_sign_verify_deprecated()
145        _check_signature_algorithm(signature_algorithm)
146        _check_not_prehashed(signature_algorithm.algorithm)
147        return _ECDSASignatureContext(
148            self._backend, self, signature_algorithm.algorithm
149        )
150
151    def exchange(self, algorithm, peer_public_key):
152        if not (
153            self._backend.elliptic_curve_exchange_algorithm_supported(
154                algorithm, self.curve
155            )
156        ):
157            raise UnsupportedAlgorithm(
158                "This backend does not support the ECDH algorithm.",
159                _Reasons.UNSUPPORTED_EXCHANGE_ALGORITHM
160            )
161
162        if peer_public_key.curve.name != self.curve.name:
163            raise ValueError(
164                "peer_public_key and self are not on the same curve"
165            )
166
167        group = self._backend._lib.EC_KEY_get0_group(self._ec_key)
168        z_len = (self._backend._lib.EC_GROUP_get_degree(group) + 7) // 8
169        self._backend.openssl_assert(z_len > 0)
170        z_buf = self._backend._ffi.new("uint8_t[]", z_len)
171        peer_key = self._backend._lib.EC_KEY_get0_public_key(
172            peer_public_key._ec_key
173        )
174
175        r = self._backend._lib.ECDH_compute_key(
176            z_buf, z_len, peer_key, self._ec_key, self._backend._ffi.NULL
177        )
178        self._backend.openssl_assert(r > 0)
179        return self._backend._ffi.buffer(z_buf)[:z_len]
180
181    def public_key(self):
182        group = self._backend._lib.EC_KEY_get0_group(self._ec_key)
183        self._backend.openssl_assert(group != self._backend._ffi.NULL)
184
185        curve_nid = self._backend._lib.EC_GROUP_get_curve_name(group)
186
187        public_ec_key = self._backend._lib.EC_KEY_new_by_curve_name(curve_nid)
188        self._backend.openssl_assert(public_ec_key != self._backend._ffi.NULL)
189        public_ec_key = self._backend._ffi.gc(
190            public_ec_key, self._backend._lib.EC_KEY_free
191        )
192
193        point = self._backend._lib.EC_KEY_get0_public_key(self._ec_key)
194        self._backend.openssl_assert(point != self._backend._ffi.NULL)
195
196        res = self._backend._lib.EC_KEY_set_public_key(public_ec_key, point)
197        self._backend.openssl_assert(res == 1)
198
199        evp_pkey = self._backend._ec_cdata_to_evp_pkey(public_ec_key)
200
201        return _EllipticCurvePublicKey(self._backend, public_ec_key, evp_pkey)
202
203    def private_numbers(self):
204        bn = self._backend._lib.EC_KEY_get0_private_key(self._ec_key)
205        private_value = self._backend._bn_to_int(bn)
206        return ec.EllipticCurvePrivateNumbers(
207            private_value=private_value,
208            public_numbers=self.public_key().public_numbers()
209        )
210
211    def private_bytes(self, encoding, format, encryption_algorithm):
212        return self._backend._private_key_bytes(
213            encoding,
214            format,
215            encryption_algorithm,
216            self._evp_pkey,
217            self._ec_key
218        )
219
220    def sign(self, data, signature_algorithm):
221        _check_signature_algorithm(signature_algorithm)
222        data, algorithm = _calculate_digest_and_algorithm(
223            self._backend, data, signature_algorithm._algorithm
224        )
225        return _ecdsa_sig_sign(self._backend, self, data)
226
227
228@utils.register_interface(ec.EllipticCurvePublicKeyWithSerialization)
229class _EllipticCurvePublicKey(object):
230    def __init__(self, backend, ec_key_cdata, evp_pkey):
231        self._backend = backend
232        _mark_asn1_named_ec_curve(backend, ec_key_cdata)
233        self._ec_key = ec_key_cdata
234        self._evp_pkey = evp_pkey
235
236        sn = _ec_key_curve_sn(backend, ec_key_cdata)
237        self._curve = _sn_to_elliptic_curve(backend, sn)
238
239    curve = utils.read_only_property("_curve")
240
241    @property
242    def key_size(self):
243        return self.curve.key_size
244
245    def verifier(self, signature, signature_algorithm):
246        _warn_sign_verify_deprecated()
247        utils._check_bytes("signature", signature)
248
249        _check_signature_algorithm(signature_algorithm)
250        _check_not_prehashed(signature_algorithm.algorithm)
251        return _ECDSAVerificationContext(
252            self._backend, self, signature, signature_algorithm.algorithm
253        )
254
255    def public_numbers(self):
256        get_func, group = (
257            self._backend._ec_key_determine_group_get_func(self._ec_key)
258        )
259        point = self._backend._lib.EC_KEY_get0_public_key(self._ec_key)
260        self._backend.openssl_assert(point != self._backend._ffi.NULL)
261
262        with self._backend._tmp_bn_ctx() as bn_ctx:
263            bn_x = self._backend._lib.BN_CTX_get(bn_ctx)
264            bn_y = self._backend._lib.BN_CTX_get(bn_ctx)
265
266            res = get_func(group, point, bn_x, bn_y, bn_ctx)
267            self._backend.openssl_assert(res == 1)
268
269            x = self._backend._bn_to_int(bn_x)
270            y = self._backend._bn_to_int(bn_y)
271
272        return ec.EllipticCurvePublicNumbers(
273            x=x,
274            y=y,
275            curve=self._curve
276        )
277
278    def _encode_point(self, format):
279        if format is serialization.PublicFormat.CompressedPoint:
280            conversion = self._backend._lib.POINT_CONVERSION_COMPRESSED
281        else:
282            assert format is serialization.PublicFormat.UncompressedPoint
283            conversion = self._backend._lib.POINT_CONVERSION_UNCOMPRESSED
284
285        group = self._backend._lib.EC_KEY_get0_group(self._ec_key)
286        self._backend.openssl_assert(group != self._backend._ffi.NULL)
287        point = self._backend._lib.EC_KEY_get0_public_key(self._ec_key)
288        self._backend.openssl_assert(point != self._backend._ffi.NULL)
289        with self._backend._tmp_bn_ctx() as bn_ctx:
290            buflen = self._backend._lib.EC_POINT_point2oct(
291                group, point, conversion, self._backend._ffi.NULL, 0, bn_ctx
292            )
293            self._backend.openssl_assert(buflen > 0)
294            buf = self._backend._ffi.new("char[]", buflen)
295            res = self._backend._lib.EC_POINT_point2oct(
296                group, point, conversion, buf, buflen, bn_ctx
297            )
298            self._backend.openssl_assert(buflen == res)
299
300        return self._backend._ffi.buffer(buf)[:]
301
302    def public_bytes(self, encoding, format):
303        if format is serialization.PublicFormat.PKCS1:
304            raise ValueError(
305                "EC public keys do not support PKCS1 serialization"
306            )
307
308        if (
309            encoding is serialization.Encoding.X962 or
310            format is serialization.PublicFormat.CompressedPoint or
311            format is serialization.PublicFormat.UncompressedPoint
312        ):
313            if (
314                encoding is not serialization.Encoding.X962 or
315                format not in (
316                    serialization.PublicFormat.CompressedPoint,
317                    serialization.PublicFormat.UncompressedPoint
318                )
319            ):
320                raise ValueError(
321                    "X962 encoding must be used with CompressedPoint or "
322                    "UncompressedPoint format"
323                )
324
325            return self._encode_point(format)
326        else:
327            return self._backend._public_key_bytes(
328                encoding,
329                format,
330                self,
331                self._evp_pkey,
332                None
333            )
334
335    def verify(self, signature, data, signature_algorithm):
336        _check_signature_algorithm(signature_algorithm)
337        data, algorithm = _calculate_digest_and_algorithm(
338            self._backend, data, signature_algorithm._algorithm
339        )
340        _ecdsa_sig_verify(self._backend, self, signature, data)
341