• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import absolute_import, division, print_function
6
7from cryptography import utils
8from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
9from cryptography.hazmat.primitives import serialization
10from cryptography.hazmat.primitives.asymmetric import dh
11
12
13def _dh_params_dup(dh_cdata, backend):
14    lib = backend._lib
15    ffi = backend._ffi
16
17    param_cdata = lib.DHparams_dup(dh_cdata)
18    backend.openssl_assert(param_cdata != ffi.NULL)
19    param_cdata = ffi.gc(param_cdata, lib.DH_free)
20    if lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_102:
21        # In OpenSSL versions < 1.0.2 or libressl DHparams_dup don't copy q
22        q = ffi.new("BIGNUM **")
23        lib.DH_get0_pqg(dh_cdata, ffi.NULL, q, ffi.NULL)
24        q_dup = lib.BN_dup(q[0])
25        res = lib.DH_set0_pqg(param_cdata, ffi.NULL, q_dup, ffi.NULL)
26        backend.openssl_assert(res == 1)
27
28    return param_cdata
29
30
31def _dh_cdata_to_parameters(dh_cdata, backend):
32    param_cdata = _dh_params_dup(dh_cdata, backend)
33    return _DHParameters(backend, param_cdata)
34
35
36@utils.register_interface(dh.DHParametersWithSerialization)
37class _DHParameters(object):
38    def __init__(self, backend, dh_cdata):
39        self._backend = backend
40        self._dh_cdata = dh_cdata
41
42    def parameter_numbers(self):
43        p = self._backend._ffi.new("BIGNUM **")
44        g = self._backend._ffi.new("BIGNUM **")
45        q = self._backend._ffi.new("BIGNUM **")
46        self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
47        self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
48        self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
49        if q[0] == self._backend._ffi.NULL:
50            q_val = None
51        else:
52            q_val = self._backend._bn_to_int(q[0])
53        return dh.DHParameterNumbers(
54            p=self._backend._bn_to_int(p[0]),
55            g=self._backend._bn_to_int(g[0]),
56            q=q_val
57        )
58
59    def generate_private_key(self):
60        return self._backend.generate_dh_private_key(self)
61
62    def parameter_bytes(self, encoding, format):
63        if format is not serialization.ParameterFormat.PKCS3:
64            raise ValueError(
65                "Only PKCS3 serialization is supported"
66            )
67        if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
68            q = self._backend._ffi.new("BIGNUM **")
69            self._backend._lib.DH_get0_pqg(self._dh_cdata,
70                                           self._backend._ffi.NULL,
71                                           q,
72                                           self._backend._ffi.NULL)
73            if q[0] != self._backend._ffi.NULL:
74                raise UnsupportedAlgorithm(
75                    "DH X9.42 serialization is not supported",
76                    _Reasons.UNSUPPORTED_SERIALIZATION)
77
78        return self._backend._parameter_bytes(
79            encoding,
80            format,
81            self._dh_cdata
82        )
83
84
85def _handle_dh_compute_key_error(errors, backend):
86    lib = backend._lib
87
88    backend.openssl_assert(
89        errors[0]._lib_reason_match(
90            lib.ERR_LIB_DH, lib.DH_R_INVALID_PUBKEY
91        )
92    )
93
94    raise ValueError("Public key value is invalid for this exchange.")
95
96
97def _get_dh_num_bits(backend, dh_cdata):
98    p = backend._ffi.new("BIGNUM **")
99    backend._lib.DH_get0_pqg(dh_cdata, p,
100                             backend._ffi.NULL,
101                             backend._ffi.NULL)
102    backend.openssl_assert(p[0] != backend._ffi.NULL)
103    return backend._lib.BN_num_bits(p[0])
104
105
106@utils.register_interface(dh.DHPrivateKeyWithSerialization)
107class _DHPrivateKey(object):
108    def __init__(self, backend, dh_cdata, evp_pkey):
109        self._backend = backend
110        self._dh_cdata = dh_cdata
111        self._evp_pkey = evp_pkey
112        self._key_size_bytes = self._backend._lib.DH_size(dh_cdata)
113
114    @property
115    def key_size(self):
116        return _get_dh_num_bits(self._backend, self._dh_cdata)
117
118    def private_numbers(self):
119        p = self._backend._ffi.new("BIGNUM **")
120        g = self._backend._ffi.new("BIGNUM **")
121        q = self._backend._ffi.new("BIGNUM **")
122        self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
123        self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
124        self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
125        if q[0] == self._backend._ffi.NULL:
126            q_val = None
127        else:
128            q_val = self._backend._bn_to_int(q[0])
129        pub_key = self._backend._ffi.new("BIGNUM **")
130        priv_key = self._backend._ffi.new("BIGNUM **")
131        self._backend._lib.DH_get0_key(self._dh_cdata, pub_key, priv_key)
132        self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
133        self._backend.openssl_assert(priv_key[0] != self._backend._ffi.NULL)
134        return dh.DHPrivateNumbers(
135            public_numbers=dh.DHPublicNumbers(
136                parameter_numbers=dh.DHParameterNumbers(
137                    p=self._backend._bn_to_int(p[0]),
138                    g=self._backend._bn_to_int(g[0]),
139                    q=q_val
140                ),
141                y=self._backend._bn_to_int(pub_key[0])
142            ),
143            x=self._backend._bn_to_int(priv_key[0])
144        )
145
146    def exchange(self, peer_public_key):
147
148        buf = self._backend._ffi.new("unsigned char[]", self._key_size_bytes)
149        pub_key = self._backend._ffi.new("BIGNUM **")
150        self._backend._lib.DH_get0_key(peer_public_key._dh_cdata, pub_key,
151                                       self._backend._ffi.NULL)
152        self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
153        res = self._backend._lib.DH_compute_key(
154            buf,
155            pub_key[0],
156            self._dh_cdata
157        )
158
159        if res == -1:
160            errors = self._backend._consume_errors()
161            return _handle_dh_compute_key_error(errors, self._backend)
162        else:
163            self._backend.openssl_assert(res >= 1)
164
165            key = self._backend._ffi.buffer(buf)[:res]
166            pad = self._key_size_bytes - len(key)
167
168            if pad > 0:
169                key = (b"\x00" * pad) + key
170
171            return key
172
173    def public_key(self):
174        dh_cdata = _dh_params_dup(self._dh_cdata, self._backend)
175        pub_key = self._backend._ffi.new("BIGNUM **")
176        self._backend._lib.DH_get0_key(self._dh_cdata,
177                                       pub_key, self._backend._ffi.NULL)
178        self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
179        pub_key_dup = self._backend._lib.BN_dup(pub_key[0])
180        self._backend.openssl_assert(pub_key_dup != self._backend._ffi.NULL)
181
182        res = self._backend._lib.DH_set0_key(dh_cdata,
183                                             pub_key_dup,
184                                             self._backend._ffi.NULL)
185        self._backend.openssl_assert(res == 1)
186        evp_pkey = self._backend._dh_cdata_to_evp_pkey(dh_cdata)
187        return _DHPublicKey(self._backend, dh_cdata, evp_pkey)
188
189    def parameters(self):
190        return _dh_cdata_to_parameters(self._dh_cdata, self._backend)
191
192    def private_bytes(self, encoding, format, encryption_algorithm):
193        if format is not serialization.PrivateFormat.PKCS8:
194            raise ValueError(
195                "DH private keys support only PKCS8 serialization"
196            )
197        if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
198            q = self._backend._ffi.new("BIGNUM **")
199            self._backend._lib.DH_get0_pqg(self._dh_cdata,
200                                           self._backend._ffi.NULL,
201                                           q,
202                                           self._backend._ffi.NULL)
203            if q[0] != self._backend._ffi.NULL:
204                raise UnsupportedAlgorithm(
205                    "DH X9.42 serialization is not supported",
206                    _Reasons.UNSUPPORTED_SERIALIZATION)
207
208        return self._backend._private_key_bytes(
209            encoding,
210            format,
211            encryption_algorithm,
212            self._evp_pkey,
213            self._dh_cdata
214        )
215
216
217@utils.register_interface(dh.DHPublicKeyWithSerialization)
218class _DHPublicKey(object):
219    def __init__(self, backend, dh_cdata, evp_pkey):
220        self._backend = backend
221        self._dh_cdata = dh_cdata
222        self._evp_pkey = evp_pkey
223        self._key_size_bits = _get_dh_num_bits(self._backend, self._dh_cdata)
224
225    @property
226    def key_size(self):
227        return self._key_size_bits
228
229    def public_numbers(self):
230        p = self._backend._ffi.new("BIGNUM **")
231        g = self._backend._ffi.new("BIGNUM **")
232        q = self._backend._ffi.new("BIGNUM **")
233        self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
234        self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
235        self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
236        if q[0] == self._backend._ffi.NULL:
237            q_val = None
238        else:
239            q_val = self._backend._bn_to_int(q[0])
240        pub_key = self._backend._ffi.new("BIGNUM **")
241        self._backend._lib.DH_get0_key(self._dh_cdata,
242                                       pub_key, self._backend._ffi.NULL)
243        self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
244        return dh.DHPublicNumbers(
245            parameter_numbers=dh.DHParameterNumbers(
246                p=self._backend._bn_to_int(p[0]),
247                g=self._backend._bn_to_int(g[0]),
248                q=q_val
249            ),
250            y=self._backend._bn_to_int(pub_key[0])
251        )
252
253    def parameters(self):
254        return _dh_cdata_to_parameters(self._dh_cdata, self._backend)
255
256    def public_bytes(self, encoding, format):
257        if format is not serialization.PublicFormat.SubjectPublicKeyInfo:
258            raise ValueError(
259                "DH public keys support only "
260                "SubjectPublicKeyInfo serialization"
261            )
262
263        if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
264            q = self._backend._ffi.new("BIGNUM **")
265            self._backend._lib.DH_get0_pqg(self._dh_cdata,
266                                           self._backend._ffi.NULL,
267                                           q,
268                                           self._backend._ffi.NULL)
269            if q[0] != self._backend._ffi.NULL:
270                raise UnsupportedAlgorithm(
271                    "DH X9.42 serialization is not supported",
272                    _Reasons.UNSUPPORTED_SERIALIZATION)
273
274        return self._backend._public_key_bytes(
275            encoding,
276            format,
277            self,
278            self._evp_pkey,
279            None
280        )
281