1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7from cryptography import utils 8from cryptography.exceptions import ( 9 InvalidSignature, UnsupportedAlgorithm, _Reasons 10) 11from cryptography.hazmat.backends.openssl.utils import ( 12 _calculate_digest_and_algorithm, _check_not_prehashed, 13 _warn_sign_verify_deprecated 14) 15from cryptography.hazmat.primitives import hashes, serialization 16from cryptography.hazmat.primitives.asymmetric import ( 17 AsymmetricSignatureContext, AsymmetricVerificationContext, ec 18) 19 20 21def _check_signature_algorithm(signature_algorithm): 22 if not isinstance(signature_algorithm, ec.ECDSA): 23 raise UnsupportedAlgorithm( 24 "Unsupported elliptic curve signature algorithm.", 25 _Reasons.UNSUPPORTED_PUBLIC_KEY_ALGORITHM) 26 27 28def _ec_key_curve_sn(backend, ec_key): 29 group = backend._lib.EC_KEY_get0_group(ec_key) 30 backend.openssl_assert(group != backend._ffi.NULL) 31 32 nid = backend._lib.EC_GROUP_get_curve_name(group) 33 # The following check is to find EC keys with unnamed curves and raise 34 # an error for now. 35 if nid == backend._lib.NID_undef: 36 raise NotImplementedError( 37 "ECDSA certificates with unnamed curves are unsupported " 38 "at this time" 39 ) 40 41 curve_name = backend._lib.OBJ_nid2sn(nid) 42 backend.openssl_assert(curve_name != backend._ffi.NULL) 43 44 sn = backend._ffi.string(curve_name).decode('ascii') 45 return sn 46 47 48def _mark_asn1_named_ec_curve(backend, ec_cdata): 49 """ 50 Set the named curve flag on the EC_KEY. This causes OpenSSL to 51 serialize EC keys along with their curve OID which makes 52 deserialization easier. 53 """ 54 55 backend._lib.EC_KEY_set_asn1_flag( 56 ec_cdata, backend._lib.OPENSSL_EC_NAMED_CURVE 57 ) 58 59 60def _sn_to_elliptic_curve(backend, sn): 61 try: 62 return ec._CURVE_TYPES[sn]() 63 except KeyError: 64 raise UnsupportedAlgorithm( 65 "{0} is not a supported elliptic curve".format(sn), 66 _Reasons.UNSUPPORTED_ELLIPTIC_CURVE 67 ) 68 69 70def _ecdsa_sig_sign(backend, private_key, data): 71 max_size = backend._lib.ECDSA_size(private_key._ec_key) 72 backend.openssl_assert(max_size > 0) 73 74 sigbuf = backend._ffi.new("unsigned char[]", max_size) 75 siglen_ptr = backend._ffi.new("unsigned int[]", 1) 76 res = backend._lib.ECDSA_sign( 77 0, data, len(data), sigbuf, siglen_ptr, private_key._ec_key 78 ) 79 backend.openssl_assert(res == 1) 80 return backend._ffi.buffer(sigbuf)[:siglen_ptr[0]] 81 82 83def _ecdsa_sig_verify(backend, public_key, signature, data): 84 res = backend._lib.ECDSA_verify( 85 0, data, len(data), signature, len(signature), public_key._ec_key 86 ) 87 if res != 1: 88 backend._consume_errors() 89 raise InvalidSignature 90 91 92@utils.register_interface(AsymmetricSignatureContext) 93class _ECDSASignatureContext(object): 94 def __init__(self, backend, private_key, algorithm): 95 self._backend = backend 96 self._private_key = private_key 97 self._digest = hashes.Hash(algorithm, backend) 98 99 def update(self, data): 100 self._digest.update(data) 101 102 def finalize(self): 103 digest = self._digest.finalize() 104 105 return _ecdsa_sig_sign(self._backend, self._private_key, digest) 106 107 108@utils.register_interface(AsymmetricVerificationContext) 109class _ECDSAVerificationContext(object): 110 def __init__(self, backend, public_key, signature, algorithm): 111 self._backend = backend 112 self._public_key = public_key 113 self._signature = signature 114 self._digest = hashes.Hash(algorithm, backend) 115 116 def update(self, data): 117 self._digest.update(data) 118 119 def verify(self): 120 digest = self._digest.finalize() 121 _ecdsa_sig_verify( 122 self._backend, self._public_key, self._signature, digest 123 ) 124 125 126@utils.register_interface(ec.EllipticCurvePrivateKeyWithSerialization) 127class _EllipticCurvePrivateKey(object): 128 def __init__(self, backend, ec_key_cdata, evp_pkey): 129 self._backend = backend 130 _mark_asn1_named_ec_curve(backend, ec_key_cdata) 131 self._ec_key = ec_key_cdata 132 self._evp_pkey = evp_pkey 133 134 sn = _ec_key_curve_sn(backend, ec_key_cdata) 135 self._curve = _sn_to_elliptic_curve(backend, sn) 136 137 curve = utils.read_only_property("_curve") 138 139 @property 140 def key_size(self): 141 return self.curve.key_size 142 143 def signer(self, signature_algorithm): 144 _warn_sign_verify_deprecated() 145 _check_signature_algorithm(signature_algorithm) 146 _check_not_prehashed(signature_algorithm.algorithm) 147 return _ECDSASignatureContext( 148 self._backend, self, signature_algorithm.algorithm 149 ) 150 151 def exchange(self, algorithm, peer_public_key): 152 if not ( 153 self._backend.elliptic_curve_exchange_algorithm_supported( 154 algorithm, self.curve 155 ) 156 ): 157 raise UnsupportedAlgorithm( 158 "This backend does not support the ECDH algorithm.", 159 _Reasons.UNSUPPORTED_EXCHANGE_ALGORITHM 160 ) 161 162 if peer_public_key.curve.name != self.curve.name: 163 raise ValueError( 164 "peer_public_key and self are not on the same curve" 165 ) 166 167 group = self._backend._lib.EC_KEY_get0_group(self._ec_key) 168 z_len = (self._backend._lib.EC_GROUP_get_degree(group) + 7) // 8 169 self._backend.openssl_assert(z_len > 0) 170 z_buf = self._backend._ffi.new("uint8_t[]", z_len) 171 peer_key = self._backend._lib.EC_KEY_get0_public_key( 172 peer_public_key._ec_key 173 ) 174 175 r = self._backend._lib.ECDH_compute_key( 176 z_buf, z_len, peer_key, self._ec_key, self._backend._ffi.NULL 177 ) 178 self._backend.openssl_assert(r > 0) 179 return self._backend._ffi.buffer(z_buf)[:z_len] 180 181 def public_key(self): 182 group = self._backend._lib.EC_KEY_get0_group(self._ec_key) 183 self._backend.openssl_assert(group != self._backend._ffi.NULL) 184 185 curve_nid = self._backend._lib.EC_GROUP_get_curve_name(group) 186 187 public_ec_key = self._backend._lib.EC_KEY_new_by_curve_name(curve_nid) 188 self._backend.openssl_assert(public_ec_key != self._backend._ffi.NULL) 189 public_ec_key = self._backend._ffi.gc( 190 public_ec_key, self._backend._lib.EC_KEY_free 191 ) 192 193 point = self._backend._lib.EC_KEY_get0_public_key(self._ec_key) 194 self._backend.openssl_assert(point != self._backend._ffi.NULL) 195 196 res = self._backend._lib.EC_KEY_set_public_key(public_ec_key, point) 197 self._backend.openssl_assert(res == 1) 198 199 evp_pkey = self._backend._ec_cdata_to_evp_pkey(public_ec_key) 200 201 return _EllipticCurvePublicKey(self._backend, public_ec_key, evp_pkey) 202 203 def private_numbers(self): 204 bn = self._backend._lib.EC_KEY_get0_private_key(self._ec_key) 205 private_value = self._backend._bn_to_int(bn) 206 return ec.EllipticCurvePrivateNumbers( 207 private_value=private_value, 208 public_numbers=self.public_key().public_numbers() 209 ) 210 211 def private_bytes(self, encoding, format, encryption_algorithm): 212 return self._backend._private_key_bytes( 213 encoding, 214 format, 215 encryption_algorithm, 216 self._evp_pkey, 217 self._ec_key 218 ) 219 220 def sign(self, data, signature_algorithm): 221 _check_signature_algorithm(signature_algorithm) 222 data, algorithm = _calculate_digest_and_algorithm( 223 self._backend, data, signature_algorithm._algorithm 224 ) 225 return _ecdsa_sig_sign(self._backend, self, data) 226 227 228@utils.register_interface(ec.EllipticCurvePublicKeyWithSerialization) 229class _EllipticCurvePublicKey(object): 230 def __init__(self, backend, ec_key_cdata, evp_pkey): 231 self._backend = backend 232 _mark_asn1_named_ec_curve(backend, ec_key_cdata) 233 self._ec_key = ec_key_cdata 234 self._evp_pkey = evp_pkey 235 236 sn = _ec_key_curve_sn(backend, ec_key_cdata) 237 self._curve = _sn_to_elliptic_curve(backend, sn) 238 239 curve = utils.read_only_property("_curve") 240 241 @property 242 def key_size(self): 243 return self.curve.key_size 244 245 def verifier(self, signature, signature_algorithm): 246 _warn_sign_verify_deprecated() 247 utils._check_bytes("signature", signature) 248 249 _check_signature_algorithm(signature_algorithm) 250 _check_not_prehashed(signature_algorithm.algorithm) 251 return _ECDSAVerificationContext( 252 self._backend, self, signature, signature_algorithm.algorithm 253 ) 254 255 def public_numbers(self): 256 get_func, group = ( 257 self._backend._ec_key_determine_group_get_func(self._ec_key) 258 ) 259 point = self._backend._lib.EC_KEY_get0_public_key(self._ec_key) 260 self._backend.openssl_assert(point != self._backend._ffi.NULL) 261 262 with self._backend._tmp_bn_ctx() as bn_ctx: 263 bn_x = self._backend._lib.BN_CTX_get(bn_ctx) 264 bn_y = self._backend._lib.BN_CTX_get(bn_ctx) 265 266 res = get_func(group, point, bn_x, bn_y, bn_ctx) 267 self._backend.openssl_assert(res == 1) 268 269 x = self._backend._bn_to_int(bn_x) 270 y = self._backend._bn_to_int(bn_y) 271 272 return ec.EllipticCurvePublicNumbers( 273 x=x, 274 y=y, 275 curve=self._curve 276 ) 277 278 def _encode_point(self, format): 279 if format is serialization.PublicFormat.CompressedPoint: 280 conversion = self._backend._lib.POINT_CONVERSION_COMPRESSED 281 else: 282 assert format is serialization.PublicFormat.UncompressedPoint 283 conversion = self._backend._lib.POINT_CONVERSION_UNCOMPRESSED 284 285 group = self._backend._lib.EC_KEY_get0_group(self._ec_key) 286 self._backend.openssl_assert(group != self._backend._ffi.NULL) 287 point = self._backend._lib.EC_KEY_get0_public_key(self._ec_key) 288 self._backend.openssl_assert(point != self._backend._ffi.NULL) 289 with self._backend._tmp_bn_ctx() as bn_ctx: 290 buflen = self._backend._lib.EC_POINT_point2oct( 291 group, point, conversion, self._backend._ffi.NULL, 0, bn_ctx 292 ) 293 self._backend.openssl_assert(buflen > 0) 294 buf = self._backend._ffi.new("char[]", buflen) 295 res = self._backend._lib.EC_POINT_point2oct( 296 group, point, conversion, buf, buflen, bn_ctx 297 ) 298 self._backend.openssl_assert(buflen == res) 299 300 return self._backend._ffi.buffer(buf)[:] 301 302 def public_bytes(self, encoding, format): 303 if format is serialization.PublicFormat.PKCS1: 304 raise ValueError( 305 "EC public keys do not support PKCS1 serialization" 306 ) 307 308 if ( 309 encoding is serialization.Encoding.X962 or 310 format is serialization.PublicFormat.CompressedPoint or 311 format is serialization.PublicFormat.UncompressedPoint 312 ): 313 if ( 314 encoding is not serialization.Encoding.X962 or 315 format not in ( 316 serialization.PublicFormat.CompressedPoint, 317 serialization.PublicFormat.UncompressedPoint 318 ) 319 ): 320 raise ValueError( 321 "X962 encoding must be used with CompressedPoint or " 322 "UncompressedPoint format" 323 ) 324 325 return self._encode_point(format) 326 else: 327 return self._backend._public_key_bytes( 328 encoding, 329 format, 330 self, 331 self._evp_pkey, 332 None 333 ) 334 335 def verify(self, signature, data, signature_algorithm): 336 _check_signature_algorithm(signature_algorithm) 337 data, algorithm = _calculate_digest_and_algorithm( 338 self._backend, data, signature_algorithm._algorithm 339 ) 340 _ecdsa_sig_verify(self._backend, self, signature, data) 341