1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7from cryptography import utils 8from cryptography.exceptions import ( 9 InvalidSignature, 10 UnsupportedAlgorithm, 11 _Reasons, 12) 13from cryptography.hazmat.backends.openssl.utils import ( 14 _calculate_digest_and_algorithm, 15 _check_not_prehashed, 16 _warn_sign_verify_deprecated, 17) 18from cryptography.hazmat.primitives import hashes, serialization 19from cryptography.hazmat.primitives.asymmetric import ( 20 AsymmetricSignatureContext, 21 AsymmetricVerificationContext, 22 ec, 23) 24 25 26def _check_signature_algorithm(signature_algorithm): 27 if not isinstance(signature_algorithm, ec.ECDSA): 28 raise UnsupportedAlgorithm( 29 "Unsupported elliptic curve signature algorithm.", 30 _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM, 31 ) 32 33 34def _ec_key_curve_sn(backend, ec_key): 35 group = backend._lib.EC_KEY_get0_group(ec_key) 36 backend.openssl_assert(group != backend._ffi.NULL) 37 38 nid = backend._lib.EC_GROUP_get_curve_name(group) 39 # The following check is to find EC keys with unnamed curves and raise 40 # an error for now. 41 if nid == backend._lib.NID_undef: 42 raise NotImplementedError( 43 "ECDSA keys with unnamed curves are unsupported at this time" 44 ) 45 46 # This is like the above check, but it also catches the case where you 47 # explicitly encoded a curve with the same parameters as a named curve. 48 # Don't do that. 49 if ( 50 not backend._lib.CRYPTOGRAPHY_IS_LIBRESSL 51 and backend._lib.EC_GROUP_get_asn1_flag(group) == 0 52 ): 53 raise NotImplementedError( 54 "ECDSA keys with unnamed curves are unsupported at this time" 55 ) 56 57 curve_name = backend._lib.OBJ_nid2sn(nid) 58 backend.openssl_assert(curve_name != backend._ffi.NULL) 59 60 sn = backend._ffi.string(curve_name).decode("ascii") 61 return sn 62 63 64def _mark_asn1_named_ec_curve(backend, ec_cdata): 65 """ 66 Set the named curve flag on the EC_KEY. This causes OpenSSL to 67 serialize EC keys along with their curve OID which makes 68 deserialization easier. 69 """ 70 71 backend._lib.EC_KEY_set_asn1_flag( 72 ec_cdata, backend._lib.OPENSSL_EC_NAMED_CURVE 73 ) 74 75 76def _sn_to_elliptic_curve(backend, sn): 77 try: 78 return ec._CURVE_TYPES[sn]() 79 except KeyError: 80 raise UnsupportedAlgorithm( 81 "{} is not a supported elliptic curve".format(sn), 82 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE, 83 ) 84 85 86def _ecdsa_sig_sign(backend, private_key, data): 87 max_size = backend._lib.ECDSA_size(private_key._ec_key) 88 backend.openssl_assert(max_size > 0) 89 90 sigbuf = backend._ffi.new("unsigned char[]", max_size) 91 siglen_ptr = backend._ffi.new("unsigned int[]", 1) 92 res = backend._lib.ECDSA_sign( 93 0, data, len(data), sigbuf, siglen_ptr, private_key._ec_key 94 ) 95 backend.openssl_assert(res == 1) 96 return backend._ffi.buffer(sigbuf)[: siglen_ptr[0]] 97 98 99def _ecdsa_sig_verify(backend, public_key, signature, data): 100 res = backend._lib.ECDSA_verify( 101 0, data, len(data), signature, len(signature), public_key._ec_key 102 ) 103 if res != 1: 104 backend._consume_errors() 105 raise InvalidSignature 106 107 108@utils.register_interface(AsymmetricSignatureContext) 109class _ECDSASignatureContext(object): 110 def __init__(self, backend, private_key, algorithm): 111 self._backend = backend 112 self._private_key = private_key 113 self._digest = hashes.Hash(algorithm, backend) 114 115 def update(self, data): 116 self._digest.update(data) 117 118 def finalize(self): 119 digest = self._digest.finalize() 120 121 return _ecdsa_sig_sign(self._backend, self._private_key, digest) 122 123 124@utils.register_interface(AsymmetricVerificationContext) 125class _ECDSAVerificationContext(object): 126 def __init__(self, backend, public_key, signature, algorithm): 127 self._backend = backend 128 self._public_key = public_key 129 self._signature = signature 130 self._digest = hashes.Hash(algorithm, backend) 131 132 def update(self, data): 133 self._digest.update(data) 134 135 def verify(self): 136 digest = self._digest.finalize() 137 _ecdsa_sig_verify( 138 self._backend, self._public_key, self._signature, digest 139 ) 140 141 142@utils.register_interface(ec.EllipticCurvePrivateKeyWithSerialization) 143class _EllipticCurvePrivateKey(object): 144 def __init__(self, backend, ec_key_cdata, evp_pkey): 145 self._backend = backend 146 self._ec_key = ec_key_cdata 147 self._evp_pkey = evp_pkey 148 149 sn = _ec_key_curve_sn(backend, ec_key_cdata) 150 self._curve = _sn_to_elliptic_curve(backend, sn) 151 _mark_asn1_named_ec_curve(backend, ec_key_cdata) 152 153 curve = utils.read_only_property("_curve") 154 155 @property 156 def key_size(self): 157 return self.curve.key_size 158 159 def signer(self, signature_algorithm): 160 _warn_sign_verify_deprecated() 161 _check_signature_algorithm(signature_algorithm) 162 _check_not_prehashed(signature_algorithm.algorithm) 163 return _ECDSASignatureContext( 164 self._backend, self, signature_algorithm.algorithm 165 ) 166 167 def exchange(self, algorithm, peer_public_key): 168 if not ( 169 self._backend.elliptic_curve_exchange_algorithm_supported( 170 algorithm, self.curve 171 ) 172 ): 173 raise UnsupportedAlgorithm( 174 "This backend does not support the ECDH algorithm.", 175 _Reasons.UNSUPPORTED_EXCHANGE_ALGORITHM, 176 ) 177 178 if peer_public_key.curve.name != self.curve.name: 179 raise ValueError( 180 "peer_public_key and self are not on the same curve" 181 ) 182 183 group = self._backend._lib.EC_KEY_get0_group(self._ec_key) 184 z_len = (self._backend._lib.EC_GROUP_get_degree(group) + 7) // 8 185 self._backend.openssl_assert(z_len > 0) 186 z_buf = self._backend._ffi.new("uint8_t[]", z_len) 187 peer_key = self._backend._lib.EC_KEY_get0_public_key( 188 peer_public_key._ec_key 189 ) 190 191 r = self._backend._lib.ECDH_compute_key( 192 z_buf, z_len, peer_key, self._ec_key, self._backend._ffi.NULL 193 ) 194 self._backend.openssl_assert(r > 0) 195 return self._backend._ffi.buffer(z_buf)[:z_len] 196 197 def public_key(self): 198 group = self._backend._lib.EC_KEY_get0_group(self._ec_key) 199 self._backend.openssl_assert(group != self._backend._ffi.NULL) 200 201 curve_nid = self._backend._lib.EC_GROUP_get_curve_name(group) 202 public_ec_key = self._backend._ec_key_new_by_curve_nid(curve_nid) 203 204 point = self._backend._lib.EC_KEY_get0_public_key(self._ec_key) 205 self._backend.openssl_assert(point != self._backend._ffi.NULL) 206 207 res = self._backend._lib.EC_KEY_set_public_key(public_ec_key, point) 208 self._backend.openssl_assert(res == 1) 209 210 evp_pkey = self._backend._ec_cdata_to_evp_pkey(public_ec_key) 211 212 return _EllipticCurvePublicKey(self._backend, public_ec_key, evp_pkey) 213 214 def private_numbers(self): 215 bn = self._backend._lib.EC_KEY_get0_private_key(self._ec_key) 216 private_value = self._backend._bn_to_int(bn) 217 return ec.EllipticCurvePrivateNumbers( 218 private_value=private_value, 219 public_numbers=self.public_key().public_numbers(), 220 ) 221 222 def private_bytes(self, encoding, format, encryption_algorithm): 223 return self._backend._private_key_bytes( 224 encoding, 225 format, 226 encryption_algorithm, 227 self, 228 self._evp_pkey, 229 self._ec_key, 230 ) 231 232 def sign(self, data, signature_algorithm): 233 _check_signature_algorithm(signature_algorithm) 234 data, algorithm = _calculate_digest_and_algorithm( 235 self._backend, data, signature_algorithm._algorithm 236 ) 237 return _ecdsa_sig_sign(self._backend, self, data) 238 239 240@utils.register_interface(ec.EllipticCurvePublicKeyWithSerialization) 241class _EllipticCurvePublicKey(object): 242 def __init__(self, backend, ec_key_cdata, evp_pkey): 243 self._backend = backend 244 self._ec_key = ec_key_cdata 245 self._evp_pkey = evp_pkey 246 247 sn = _ec_key_curve_sn(backend, ec_key_cdata) 248 self._curve = _sn_to_elliptic_curve(backend, sn) 249 _mark_asn1_named_ec_curve(backend, ec_key_cdata) 250 251 curve = utils.read_only_property("_curve") 252 253 @property 254 def key_size(self): 255 return self.curve.key_size 256 257 def verifier(self, signature, signature_algorithm): 258 _warn_sign_verify_deprecated() 259 utils._check_bytes("signature", signature) 260 261 _check_signature_algorithm(signature_algorithm) 262 _check_not_prehashed(signature_algorithm.algorithm) 263 return _ECDSAVerificationContext( 264 self._backend, self, signature, signature_algorithm.algorithm 265 ) 266 267 def public_numbers(self): 268 get_func, group = self._backend._ec_key_determine_group_get_func( 269 self._ec_key 270 ) 271 point = self._backend._lib.EC_KEY_get0_public_key(self._ec_key) 272 self._backend.openssl_assert(point != self._backend._ffi.NULL) 273 274 with self._backend._tmp_bn_ctx() as bn_ctx: 275 bn_x = self._backend._lib.BN_CTX_get(bn_ctx) 276 bn_y = self._backend._lib.BN_CTX_get(bn_ctx) 277 278 res = get_func(group, point, bn_x, bn_y, bn_ctx) 279 self._backend.openssl_assert(res == 1) 280 281 x = self._backend._bn_to_int(bn_x) 282 y = self._backend._bn_to_int(bn_y) 283 284 return ec.EllipticCurvePublicNumbers(x=x, y=y, curve=self._curve) 285 286 def _encode_point(self, format): 287 if format is serialization.PublicFormat.CompressedPoint: 288 conversion = self._backend._lib.POINT_CONVERSION_COMPRESSED 289 else: 290 assert format is serialization.PublicFormat.UncompressedPoint 291 conversion = self._backend._lib.POINT_CONVERSION_UNCOMPRESSED 292 293 group = self._backend._lib.EC_KEY_get0_group(self._ec_key) 294 self._backend.openssl_assert(group != self._backend._ffi.NULL) 295 point = self._backend._lib.EC_KEY_get0_public_key(self._ec_key) 296 self._backend.openssl_assert(point != self._backend._ffi.NULL) 297 with self._backend._tmp_bn_ctx() as bn_ctx: 298 buflen = self._backend._lib.EC_POINT_point2oct( 299 group, point, conversion, self._backend._ffi.NULL, 0, bn_ctx 300 ) 301 self._backend.openssl_assert(buflen > 0) 302 buf = self._backend._ffi.new("char[]", buflen) 303 res = self._backend._lib.EC_POINT_point2oct( 304 group, point, conversion, buf, buflen, bn_ctx 305 ) 306 self._backend.openssl_assert(buflen == res) 307 308 return self._backend._ffi.buffer(buf)[:] 309 310 def public_bytes(self, encoding, format): 311 312 if ( 313 encoding is serialization.Encoding.X962 314 or format is serialization.PublicFormat.CompressedPoint 315 or format is serialization.PublicFormat.UncompressedPoint 316 ): 317 if encoding is not serialization.Encoding.X962 or format not in ( 318 serialization.PublicFormat.CompressedPoint, 319 serialization.PublicFormat.UncompressedPoint, 320 ): 321 raise ValueError( 322 "X962 encoding must be used with CompressedPoint or " 323 "UncompressedPoint format" 324 ) 325 326 return self._encode_point(format) 327 else: 328 return self._backend._public_key_bytes( 329 encoding, format, self, self._evp_pkey, None 330 ) 331 332 def verify(self, signature, data, signature_algorithm): 333 _check_signature_algorithm(signature_algorithm) 334 data, algorithm = _calculate_digest_and_algorithm( 335 self._backend, data, signature_algorithm._algorithm 336 ) 337 _ecdsa_sig_verify(self._backend, self, signature, data) 338