1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7from cryptography import utils 8from cryptography.exceptions import UnsupportedAlgorithm, _Reasons 9from cryptography.hazmat.primitives import serialization 10from cryptography.hazmat.primitives.asymmetric import dh 11 12 13def _dh_params_dup(dh_cdata, backend): 14 lib = backend._lib 15 ffi = backend._ffi 16 17 param_cdata = lib.DHparams_dup(dh_cdata) 18 backend.openssl_assert(param_cdata != ffi.NULL) 19 param_cdata = ffi.gc(param_cdata, lib.DH_free) 20 if lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_102: 21 # In OpenSSL versions < 1.0.2 or libressl DHparams_dup don't copy q 22 q = ffi.new("BIGNUM **") 23 lib.DH_get0_pqg(dh_cdata, ffi.NULL, q, ffi.NULL) 24 q_dup = lib.BN_dup(q[0]) 25 res = lib.DH_set0_pqg(param_cdata, ffi.NULL, q_dup, ffi.NULL) 26 backend.openssl_assert(res == 1) 27 28 return param_cdata 29 30 31def _dh_cdata_to_parameters(dh_cdata, backend): 32 param_cdata = _dh_params_dup(dh_cdata, backend) 33 return _DHParameters(backend, param_cdata) 34 35 36@utils.register_interface(dh.DHParametersWithSerialization) 37class _DHParameters(object): 38 def __init__(self, backend, dh_cdata): 39 self._backend = backend 40 self._dh_cdata = dh_cdata 41 42 def parameter_numbers(self): 43 p = self._backend._ffi.new("BIGNUM **") 44 g = self._backend._ffi.new("BIGNUM **") 45 q = self._backend._ffi.new("BIGNUM **") 46 self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g) 47 self._backend.openssl_assert(p[0] != self._backend._ffi.NULL) 48 self._backend.openssl_assert(g[0] != self._backend._ffi.NULL) 49 if q[0] == self._backend._ffi.NULL: 50 q_val = None 51 else: 52 q_val = self._backend._bn_to_int(q[0]) 53 return dh.DHParameterNumbers( 54 p=self._backend._bn_to_int(p[0]), 55 g=self._backend._bn_to_int(g[0]), 56 q=q_val 57 ) 58 59 def generate_private_key(self): 60 return self._backend.generate_dh_private_key(self) 61 62 def parameter_bytes(self, encoding, format): 63 if format is not serialization.ParameterFormat.PKCS3: 64 raise ValueError( 65 "Only PKCS3 serialization is supported" 66 ) 67 if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX: 68 q = self._backend._ffi.new("BIGNUM **") 69 self._backend._lib.DH_get0_pqg(self._dh_cdata, 70 self._backend._ffi.NULL, 71 q, 72 self._backend._ffi.NULL) 73 if q[0] != self._backend._ffi.NULL: 74 raise UnsupportedAlgorithm( 75 "DH X9.42 serialization is not supported", 76 _Reasons.UNSUPPORTED_SERIALIZATION) 77 78 return self._backend._parameter_bytes( 79 encoding, 80 format, 81 self._dh_cdata 82 ) 83 84 85def _handle_dh_compute_key_error(errors, backend): 86 lib = backend._lib 87 88 backend.openssl_assert( 89 errors[0]._lib_reason_match( 90 lib.ERR_LIB_DH, lib.DH_R_INVALID_PUBKEY 91 ) 92 ) 93 94 raise ValueError("Public key value is invalid for this exchange.") 95 96 97def _get_dh_num_bits(backend, dh_cdata): 98 p = backend._ffi.new("BIGNUM **") 99 backend._lib.DH_get0_pqg(dh_cdata, p, 100 backend._ffi.NULL, 101 backend._ffi.NULL) 102 backend.openssl_assert(p[0] != backend._ffi.NULL) 103 return backend._lib.BN_num_bits(p[0]) 104 105 106@utils.register_interface(dh.DHPrivateKeyWithSerialization) 107class _DHPrivateKey(object): 108 def __init__(self, backend, dh_cdata, evp_pkey): 109 self._backend = backend 110 self._dh_cdata = dh_cdata 111 self._evp_pkey = evp_pkey 112 self._key_size_bytes = self._backend._lib.DH_size(dh_cdata) 113 114 @property 115 def key_size(self): 116 return _get_dh_num_bits(self._backend, self._dh_cdata) 117 118 def private_numbers(self): 119 p = self._backend._ffi.new("BIGNUM **") 120 g = self._backend._ffi.new("BIGNUM **") 121 q = self._backend._ffi.new("BIGNUM **") 122 self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g) 123 self._backend.openssl_assert(p[0] != self._backend._ffi.NULL) 124 self._backend.openssl_assert(g[0] != self._backend._ffi.NULL) 125 if q[0] == self._backend._ffi.NULL: 126 q_val = None 127 else: 128 q_val = self._backend._bn_to_int(q[0]) 129 pub_key = self._backend._ffi.new("BIGNUM **") 130 priv_key = self._backend._ffi.new("BIGNUM **") 131 self._backend._lib.DH_get0_key(self._dh_cdata, pub_key, priv_key) 132 self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL) 133 self._backend.openssl_assert(priv_key[0] != self._backend._ffi.NULL) 134 return dh.DHPrivateNumbers( 135 public_numbers=dh.DHPublicNumbers( 136 parameter_numbers=dh.DHParameterNumbers( 137 p=self._backend._bn_to_int(p[0]), 138 g=self._backend._bn_to_int(g[0]), 139 q=q_val 140 ), 141 y=self._backend._bn_to_int(pub_key[0]) 142 ), 143 x=self._backend._bn_to_int(priv_key[0]) 144 ) 145 146 def exchange(self, peer_public_key): 147 148 buf = self._backend._ffi.new("unsigned char[]", self._key_size_bytes) 149 pub_key = self._backend._ffi.new("BIGNUM **") 150 self._backend._lib.DH_get0_key(peer_public_key._dh_cdata, pub_key, 151 self._backend._ffi.NULL) 152 self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL) 153 res = self._backend._lib.DH_compute_key( 154 buf, 155 pub_key[0], 156 self._dh_cdata 157 ) 158 159 if res == -1: 160 errors = self._backend._consume_errors() 161 return _handle_dh_compute_key_error(errors, self._backend) 162 else: 163 self._backend.openssl_assert(res >= 1) 164 165 key = self._backend._ffi.buffer(buf)[:res] 166 pad = self._key_size_bytes - len(key) 167 168 if pad > 0: 169 key = (b"\x00" * pad) + key 170 171 return key 172 173 def public_key(self): 174 dh_cdata = _dh_params_dup(self._dh_cdata, self._backend) 175 pub_key = self._backend._ffi.new("BIGNUM **") 176 self._backend._lib.DH_get0_key(self._dh_cdata, 177 pub_key, self._backend._ffi.NULL) 178 self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL) 179 pub_key_dup = self._backend._lib.BN_dup(pub_key[0]) 180 self._backend.openssl_assert(pub_key_dup != self._backend._ffi.NULL) 181 182 res = self._backend._lib.DH_set0_key(dh_cdata, 183 pub_key_dup, 184 self._backend._ffi.NULL) 185 self._backend.openssl_assert(res == 1) 186 evp_pkey = self._backend._dh_cdata_to_evp_pkey(dh_cdata) 187 return _DHPublicKey(self._backend, dh_cdata, evp_pkey) 188 189 def parameters(self): 190 return _dh_cdata_to_parameters(self._dh_cdata, self._backend) 191 192 def private_bytes(self, encoding, format, encryption_algorithm): 193 if format is not serialization.PrivateFormat.PKCS8: 194 raise ValueError( 195 "DH private keys support only PKCS8 serialization" 196 ) 197 if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX: 198 q = self._backend._ffi.new("BIGNUM **") 199 self._backend._lib.DH_get0_pqg(self._dh_cdata, 200 self._backend._ffi.NULL, 201 q, 202 self._backend._ffi.NULL) 203 if q[0] != self._backend._ffi.NULL: 204 raise UnsupportedAlgorithm( 205 "DH X9.42 serialization is not supported", 206 _Reasons.UNSUPPORTED_SERIALIZATION) 207 208 return self._backend._private_key_bytes( 209 encoding, 210 format, 211 encryption_algorithm, 212 self._evp_pkey, 213 self._dh_cdata 214 ) 215 216 217@utils.register_interface(dh.DHPublicKeyWithSerialization) 218class _DHPublicKey(object): 219 def __init__(self, backend, dh_cdata, evp_pkey): 220 self._backend = backend 221 self._dh_cdata = dh_cdata 222 self._evp_pkey = evp_pkey 223 self._key_size_bits = _get_dh_num_bits(self._backend, self._dh_cdata) 224 225 @property 226 def key_size(self): 227 return self._key_size_bits 228 229 def public_numbers(self): 230 p = self._backend._ffi.new("BIGNUM **") 231 g = self._backend._ffi.new("BIGNUM **") 232 q = self._backend._ffi.new("BIGNUM **") 233 self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g) 234 self._backend.openssl_assert(p[0] != self._backend._ffi.NULL) 235 self._backend.openssl_assert(g[0] != self._backend._ffi.NULL) 236 if q[0] == self._backend._ffi.NULL: 237 q_val = None 238 else: 239 q_val = self._backend._bn_to_int(q[0]) 240 pub_key = self._backend._ffi.new("BIGNUM **") 241 self._backend._lib.DH_get0_key(self._dh_cdata, 242 pub_key, self._backend._ffi.NULL) 243 self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL) 244 return dh.DHPublicNumbers( 245 parameter_numbers=dh.DHParameterNumbers( 246 p=self._backend._bn_to_int(p[0]), 247 g=self._backend._bn_to_int(g[0]), 248 q=q_val 249 ), 250 y=self._backend._bn_to_int(pub_key[0]) 251 ) 252 253 def parameters(self): 254 return _dh_cdata_to_parameters(self._dh_cdata, self._backend) 255 256 def public_bytes(self, encoding, format): 257 if format is not serialization.PublicFormat.SubjectPublicKeyInfo: 258 raise ValueError( 259 "DH public keys support only " 260 "SubjectPublicKeyInfo serialization" 261 ) 262 263 if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX: 264 q = self._backend._ffi.new("BIGNUM **") 265 self._backend._lib.DH_get0_pqg(self._dh_cdata, 266 self._backend._ffi.NULL, 267 q, 268 self._backend._ffi.NULL) 269 if q[0] != self._backend._ffi.NULL: 270 raise UnsupportedAlgorithm( 271 "DH X9.42 serialization is not supported", 272 _Reasons.UNSUPPORTED_SERIALIZATION) 273 274 return self._backend._public_key_bytes( 275 encoding, 276 format, 277 self, 278 self._evp_pkey, 279 None 280 ) 281