• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# This file is dual licensed under the terms of the Apache License, Version
2# 2.0, and the BSD License. See the LICENSE file in the root of this repository
3# for complete details.
4
5from __future__ import absolute_import, division, print_function
6
7import itertools
8import os
9import subprocess
10import sys
11import textwrap
12
13from pkg_resources import parse_version
14
15import pytest
16
17from cryptography import x509
18from cryptography.exceptions import InternalError, _Reasons
19from cryptography.hazmat.backends.interfaces import DHBackend, RSABackend
20from cryptography.hazmat.backends.openssl.backend import (
21    Backend, backend
22)
23from cryptography.hazmat.backends.openssl.ec import _sn_to_elliptic_curve
24from cryptography.hazmat.primitives import hashes, serialization
25from cryptography.hazmat.primitives.asymmetric import dh, dsa, padding
26from cryptography.hazmat.primitives.ciphers import Cipher
27from cryptography.hazmat.primitives.ciphers.algorithms import AES
28from cryptography.hazmat.primitives.ciphers.modes import CBC
29
30from ..primitives.fixtures_rsa import RSA_KEY_2048, RSA_KEY_512
31from ...doubles import (
32    DummyAsymmetricPadding, DummyCipherAlgorithm, DummyHashAlgorithm, DummyMode
33)
34from ...utils import (
35    load_nist_vectors, load_vectors_from_file, raises_unsupported_algorithm
36)
37from ...x509.test_x509 import _load_cert
38
39
40def skip_if_libre_ssl(openssl_version):
41    if u'LibreSSL' in openssl_version:
42        pytest.skip("LibreSSL hard-codes RAND_bytes to use arc4random.")
43
44
45class TestLibreSkip(object):
46    def test_skip_no(self):
47        assert skip_if_libre_ssl(u"OpenSSL 1.0.2h  3 May 2016") is None
48
49    def test_skip_yes(self):
50        with pytest.raises(pytest.skip.Exception):
51            skip_if_libre_ssl(u"LibreSSL 2.1.6")
52
53
54class DummyMGF(object):
55    _salt_length = 0
56
57
58class TestOpenSSL(object):
59    def test_backend_exists(self):
60        assert backend
61
62    def test_openssl_version_text(self):
63        """
64        This test checks the value of OPENSSL_VERSION_TEXT.
65
66        Unfortunately, this define does not appear to have a
67        formal content definition, so for now we'll test to see
68        if it starts with OpenSSL or LibreSSL as that appears
69        to be true for every OpenSSL-alike.
70        """
71        assert (
72            backend.openssl_version_text().startswith("OpenSSL") or
73            backend.openssl_version_text().startswith("LibreSSL")
74        )
75
76    def test_openssl_version_number(self):
77        assert backend.openssl_version_number() > 0
78
79    def test_supports_cipher(self):
80        assert backend.cipher_supported(None, None) is False
81
82    def test_register_duplicate_cipher_adapter(self):
83        with pytest.raises(ValueError):
84            backend.register_cipher_adapter(AES, CBC, None)
85
86    @pytest.mark.parametrize("mode", [DummyMode(), None])
87    def test_nonexistent_cipher(self, mode):
88        b = Backend()
89        b.register_cipher_adapter(
90            DummyCipherAlgorithm,
91            type(mode),
92            lambda backend, cipher, mode: backend._ffi.NULL
93        )
94        cipher = Cipher(
95            DummyCipherAlgorithm(), mode, backend=b,
96        )
97        with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER):
98            cipher.encryptor()
99
100    def test_openssl_assert(self):
101        backend.openssl_assert(True)
102        with pytest.raises(InternalError):
103            backend.openssl_assert(False)
104
105    def test_consume_errors(self):
106        for i in range(10):
107            backend._lib.ERR_put_error(backend._lib.ERR_LIB_EVP, 0, 0,
108                                       b"test_openssl.py", -1)
109
110        assert backend._lib.ERR_peek_error() != 0
111
112        errors = backend._consume_errors()
113
114        assert backend._lib.ERR_peek_error() == 0
115        assert len(errors) == 10
116
117    def test_ssl_ciphers_registered(self):
118        meth = backend._lib.SSLv23_method()
119        ctx = backend._lib.SSL_CTX_new(meth)
120        assert ctx != backend._ffi.NULL
121        backend._lib.SSL_CTX_free(ctx)
122
123    def test_evp_ciphers_registered(self):
124        cipher = backend._lib.EVP_get_cipherbyname(b"aes-256-cbc")
125        assert cipher != backend._ffi.NULL
126
127    def test_error_strings_loaded(self):
128        buf = backend._ffi.new("char[]", 256)
129        backend._lib.ERR_error_string_n(101183626, buf, len(buf))
130        assert b"data not multiple of block length" in backend._ffi.string(buf)
131
132    def test_unknown_error_in_cipher_finalize(self):
133        cipher = Cipher(AES(b"\0" * 16), CBC(b"\0" * 16), backend=backend)
134        enc = cipher.encryptor()
135        enc.update(b"\0")
136        backend._lib.ERR_put_error(0, 0, 1,
137                                   b"test_openssl.py", -1)
138        with pytest.raises(InternalError):
139            enc.finalize()
140
141    def test_large_key_size_on_new_openssl(self):
142        parameters = dsa.generate_parameters(2048, backend)
143        param_num = parameters.parameter_numbers()
144        assert param_num.p.bit_length() == 2048
145        parameters = dsa.generate_parameters(3072, backend)
146        param_num = parameters.parameter_numbers()
147        assert param_num.p.bit_length() == 3072
148
149    def test_int_to_bn(self):
150        value = (2 ** 4242) - 4242
151        bn = backend._int_to_bn(value)
152        assert bn != backend._ffi.NULL
153        bn = backend._ffi.gc(bn, backend._lib.BN_clear_free)
154
155        assert bn
156        assert backend._bn_to_int(bn) == value
157
158    def test_int_to_bn_inplace(self):
159        value = (2 ** 4242) - 4242
160        bn_ptr = backend._lib.BN_new()
161        assert bn_ptr != backend._ffi.NULL
162        bn_ptr = backend._ffi.gc(bn_ptr, backend._lib.BN_free)
163        bn = backend._int_to_bn(value, bn_ptr)
164
165        assert bn == bn_ptr
166        assert backend._bn_to_int(bn_ptr) == value
167
168    def test_bn_to_int(self):
169        bn = backend._int_to_bn(0)
170        assert backend._bn_to_int(bn) == 0
171
172
173class TestOpenSSLRandomEngine(object):
174    def setup(self):
175        # The default RAND engine is global and shared between
176        # tests. We make sure that the default engine is osrandom
177        # before we start each test and restore the global state to
178        # that engine in teardown.
179        current_default = backend._lib.ENGINE_get_default_RAND()
180        name = backend._lib.ENGINE_get_name(current_default)
181        assert name == backend._binding._osrandom_engine_name
182
183    def teardown(self):
184        # we need to reset state to being default. backend is a shared global
185        # for all these tests.
186        backend.activate_osrandom_engine()
187        current_default = backend._lib.ENGINE_get_default_RAND()
188        name = backend._lib.ENGINE_get_name(current_default)
189        assert name == backend._binding._osrandom_engine_name
190
191    @pytest.mark.skipif(sys.executable is None,
192                        reason="No Python interpreter available.")
193    def test_osrandom_engine_is_default(self, tmpdir):
194        engine_printer = textwrap.dedent(
195            """
196            import sys
197            from cryptography.hazmat.backends.openssl.backend import backend
198
199            e = backend._lib.ENGINE_get_default_RAND()
200            name = backend._lib.ENGINE_get_name(e)
201            sys.stdout.write(backend._ffi.string(name).decode('ascii'))
202            res = backend._lib.ENGINE_free(e)
203            assert res == 1
204            """
205        )
206        engine_name = tmpdir.join('engine_name')
207
208        # If we're running tests via ``python setup.py test`` in a clean
209        # environment then all of our dependencies are going to be installed
210        # into either the current directory or the .eggs directory. However the
211        # subprocess won't know to activate these dependencies, so we'll get it
212        # to do so by passing our entire sys.path into the subprocess via the
213        # PYTHONPATH environment variable.
214        env = os.environ.copy()
215        env["PYTHONPATH"] = os.pathsep.join(sys.path)
216
217        with engine_name.open('w') as out:
218            subprocess.check_call(
219                [sys.executable, "-c", engine_printer],
220                env=env,
221                stdout=out,
222                stderr=subprocess.PIPE,
223            )
224
225        osrandom_engine_name = backend._ffi.string(
226            backend._binding._osrandom_engine_name
227        )
228
229        assert engine_name.read().encode('ascii') == osrandom_engine_name
230
231    def test_osrandom_sanity_check(self):
232        # This test serves as a check against catastrophic failure.
233        buf = backend._ffi.new("unsigned char[]", 500)
234        res = backend._lib.RAND_bytes(buf, 500)
235        assert res == 1
236        assert backend._ffi.buffer(buf)[:] != "\x00" * 500
237
238    def test_activate_osrandom_no_default(self):
239        backend.activate_builtin_random()
240        e = backend._lib.ENGINE_get_default_RAND()
241        assert e == backend._ffi.NULL
242        backend.activate_osrandom_engine()
243        e = backend._lib.ENGINE_get_default_RAND()
244        name = backend._lib.ENGINE_get_name(e)
245        assert name == backend._binding._osrandom_engine_name
246        res = backend._lib.ENGINE_free(e)
247        assert res == 1
248
249    def test_activate_builtin_random(self):
250        e = backend._lib.ENGINE_get_default_RAND()
251        assert e != backend._ffi.NULL
252        name = backend._lib.ENGINE_get_name(e)
253        assert name == backend._binding._osrandom_engine_name
254        res = backend._lib.ENGINE_free(e)
255        assert res == 1
256        backend.activate_builtin_random()
257        e = backend._lib.ENGINE_get_default_RAND()
258        assert e == backend._ffi.NULL
259
260    def test_activate_builtin_random_already_active(self):
261        backend.activate_builtin_random()
262        e = backend._lib.ENGINE_get_default_RAND()
263        assert e == backend._ffi.NULL
264        backend.activate_builtin_random()
265        e = backend._lib.ENGINE_get_default_RAND()
266        assert e == backend._ffi.NULL
267
268    def test_osrandom_engine_implementation(self):
269        name = backend.osrandom_engine_implementation()
270        assert name in ['/dev/urandom', 'CryptGenRandom', 'getentropy',
271                        'getrandom']
272        if sys.platform.startswith('linux'):
273            assert name in ['getrandom', '/dev/urandom']
274        if sys.platform == 'darwin':
275            # macOS 10.12+ supports getentropy
276            if parse_version(os.uname()[2]) >= parse_version("16.0"):
277                assert name == 'getentropy'
278            else:
279                assert name == '/dev/urandom'
280        if sys.platform == 'win32':
281            assert name == 'CryptGenRandom'
282
283    def test_activate_osrandom_already_default(self):
284        e = backend._lib.ENGINE_get_default_RAND()
285        name = backend._lib.ENGINE_get_name(e)
286        assert name == backend._binding._osrandom_engine_name
287        res = backend._lib.ENGINE_free(e)
288        assert res == 1
289        backend.activate_osrandom_engine()
290        e = backend._lib.ENGINE_get_default_RAND()
291        name = backend._lib.ENGINE_get_name(e)
292        assert name == backend._binding._osrandom_engine_name
293        res = backend._lib.ENGINE_free(e)
294        assert res == 1
295
296
297class TestOpenSSLRSA(object):
298    def test_generate_rsa_parameters_supported(self):
299        assert backend.generate_rsa_parameters_supported(1, 1024) is False
300        assert backend.generate_rsa_parameters_supported(4, 1024) is False
301        assert backend.generate_rsa_parameters_supported(3, 1024) is True
302        assert backend.generate_rsa_parameters_supported(3, 511) is False
303
304    def test_generate_bad_public_exponent(self):
305        with pytest.raises(ValueError):
306            backend.generate_rsa_private_key(public_exponent=1, key_size=2048)
307
308        with pytest.raises(ValueError):
309            backend.generate_rsa_private_key(public_exponent=4, key_size=2048)
310
311    def test_cant_generate_insecure_tiny_key(self):
312        with pytest.raises(ValueError):
313            backend.generate_rsa_private_key(public_exponent=65537,
314                                             key_size=511)
315
316        with pytest.raises(ValueError):
317            backend.generate_rsa_private_key(public_exponent=65537,
318                                             key_size=256)
319
320    def test_rsa_padding_unsupported_pss_mgf1_hash(self):
321        assert backend.rsa_padding_supported(
322            padding.PSS(mgf=padding.MGF1(DummyHashAlgorithm()), salt_length=0)
323        ) is False
324
325    def test_rsa_padding_unsupported(self):
326        assert backend.rsa_padding_supported(DummyAsymmetricPadding()) is False
327
328    def test_rsa_padding_supported_pkcs1v15(self):
329        assert backend.rsa_padding_supported(padding.PKCS1v15()) is True
330
331    def test_rsa_padding_supported_pss(self):
332        assert backend.rsa_padding_supported(
333            padding.PSS(mgf=padding.MGF1(hashes.SHA1()), salt_length=0)
334        ) is True
335
336    def test_rsa_padding_supported_oaep(self):
337        assert backend.rsa_padding_supported(
338            padding.OAEP(
339                mgf=padding.MGF1(algorithm=hashes.SHA1()),
340                algorithm=hashes.SHA1(),
341                label=None
342            ),
343        ) is True
344
345    @pytest.mark.skipif(
346        backend._lib.Cryptography_HAS_RSA_OAEP_MD == 0,
347        reason="Requires OpenSSL with rsa_oaep_md (1.0.2+)"
348    )
349    def test_rsa_padding_supported_oaep_sha2_combinations(self):
350        hashalgs = [
351            hashes.SHA1(),
352            hashes.SHA224(),
353            hashes.SHA256(),
354            hashes.SHA384(),
355            hashes.SHA512(),
356        ]
357        for mgf1alg, oaepalg in itertools.product(hashalgs, hashalgs):
358            assert backend.rsa_padding_supported(
359                padding.OAEP(
360                    mgf=padding.MGF1(algorithm=mgf1alg),
361                    algorithm=oaepalg,
362                    label=None
363                ),
364            ) is True
365
366    def test_rsa_padding_unsupported_mgf(self):
367        assert backend.rsa_padding_supported(
368            padding.OAEP(
369                mgf=DummyMGF(),
370                algorithm=hashes.SHA1(),
371                label=None
372            ),
373        ) is False
374
375        assert backend.rsa_padding_supported(
376            padding.PSS(mgf=DummyMGF(), salt_length=0)
377        ) is False
378
379    @pytest.mark.skipif(
380        backend._lib.Cryptography_HAS_RSA_OAEP_MD == 1,
381        reason="Requires OpenSSL without rsa_oaep_md (< 1.0.2)"
382    )
383    def test_unsupported_mgf1_hash_algorithm_decrypt(self):
384        private_key = RSA_KEY_512.private_key(backend)
385        with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_PADDING):
386            private_key.decrypt(
387                b"0" * 64,
388                padding.OAEP(
389                    mgf=padding.MGF1(algorithm=hashes.SHA256()),
390                    algorithm=hashes.SHA1(),
391                    label=None
392                )
393            )
394
395    @pytest.mark.skipif(
396        backend._lib.Cryptography_HAS_RSA_OAEP_MD == 1,
397        reason="Requires OpenSSL without rsa_oaep_md (< 1.0.2)"
398    )
399    def test_unsupported_oaep_hash_algorithm_decrypt(self):
400        private_key = RSA_KEY_512.private_key(backend)
401        with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_PADDING):
402            private_key.decrypt(
403                b"0" * 64,
404                padding.OAEP(
405                    mgf=padding.MGF1(algorithm=hashes.SHA1()),
406                    algorithm=hashes.SHA256(),
407                    label=None
408                )
409            )
410
411    def test_unsupported_mgf1_hash_algorithm_md5_decrypt(self):
412        private_key = RSA_KEY_512.private_key(backend)
413        with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_PADDING):
414            private_key.decrypt(
415                b"0" * 64,
416                padding.OAEP(
417                    mgf=padding.MGF1(algorithm=hashes.MD5()),
418                    algorithm=hashes.MD5(),
419                    label=None
420                )
421            )
422
423
424class TestOpenSSLCMAC(object):
425    def test_unsupported_cipher(self):
426        with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_CIPHER):
427            backend.create_cmac_ctx(DummyCipherAlgorithm())
428
429
430class TestOpenSSLSignX509Certificate(object):
431    def test_requires_certificate_builder(self):
432        private_key = RSA_KEY_2048.private_key(backend)
433
434        with pytest.raises(TypeError):
435            backend.create_x509_certificate(
436                object(), private_key, DummyHashAlgorithm()
437            )
438
439
440class TestOpenSSLSignX509CertificateRevocationList(object):
441    def test_invalid_builder(self):
442        private_key = RSA_KEY_2048.private_key(backend)
443
444        with pytest.raises(TypeError):
445            backend.create_x509_crl(object(), private_key, hashes.SHA256())
446
447
448class TestOpenSSLCreateRevokedCertificate(object):
449    def test_invalid_builder(self):
450        with pytest.raises(TypeError):
451            backend.create_x509_revoked_certificate(object())
452
453
454class TestOpenSSLSerializationWithOpenSSL(object):
455    def test_pem_password_cb(self):
456        userdata = backend._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *")
457        pw = b"abcdefg"
458        password = backend._ffi.new("char []", pw)
459        userdata.password = password
460        userdata.length = len(pw)
461        buflen = 10
462        buf = backend._ffi.new("char []", buflen)
463        res = backend._lib.Cryptography_pem_password_cb(
464            buf, buflen, 0, userdata
465        )
466        assert res == len(pw)
467        assert userdata.called == 1
468        assert backend._ffi.buffer(buf, len(pw))[:] == pw
469        assert userdata.maxsize == buflen
470        assert userdata.error == 0
471
472    def test_pem_password_cb_no_password(self):
473        userdata = backend._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *")
474        buflen = 10
475        buf = backend._ffi.new("char []", buflen)
476        res = backend._lib.Cryptography_pem_password_cb(
477            buf, buflen, 0, userdata
478        )
479        assert res == 0
480        assert userdata.error == -1
481
482    def test_unsupported_evp_pkey_type(self):
483        key = backend._create_evp_pkey_gc()
484        with raises_unsupported_algorithm(None):
485            backend._evp_pkey_to_private_key(key)
486        with raises_unsupported_algorithm(None):
487            backend._evp_pkey_to_public_key(key)
488
489    def test_very_long_pem_serialization_password(self):
490        password = b"x" * 1024
491
492        with pytest.raises(ValueError):
493            load_vectors_from_file(
494                os.path.join(
495                    "asymmetric", "Traditional_OpenSSL_Serialization",
496                    "key1.pem"
497                ),
498                lambda pemfile: (
499                    backend.load_pem_private_key(
500                        pemfile.read().encode(), password
501                    )
502                )
503            )
504
505
506class TestOpenSSLEllipticCurve(object):
507    def test_sn_to_elliptic_curve_not_supported(self):
508        with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_ELLIPTIC_CURVE):
509            _sn_to_elliptic_curve(backend, b"fake")
510
511
512@pytest.mark.requires_backend_interface(interface=RSABackend)
513class TestRSAPEMSerialization(object):
514    def test_password_length_limit(self):
515        password = b"x" * 1024
516        key = RSA_KEY_2048.private_key(backend)
517        with pytest.raises(ValueError):
518            key.private_bytes(
519                serialization.Encoding.PEM,
520                serialization.PrivateFormat.PKCS8,
521                serialization.BestAvailableEncryption(password)
522            )
523
524
525class TestGOSTCertificate(object):
526    def test_numeric_string_x509_name_entry(self):
527        cert = _load_cert(
528            os.path.join("x509", "e-trust.ru.der"),
529            x509.load_der_x509_certificate,
530            backend
531        )
532        if backend._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_102I:
533            with pytest.raises(ValueError) as exc:
534                cert.subject
535
536            # We assert on the message in this case because if the certificate
537            # fails to load it will also raise a ValueError and this test could
538            # erroneously pass.
539            assert str(exc.value) == "Unsupported ASN1 string type. Type: 18"
540        else:
541            assert cert.subject.get_attributes_for_oid(
542                x509.ObjectIdentifier("1.2.643.3.131.1.1")
543            )[0].value == "007710474375"
544
545
546@pytest.mark.skipif(
547    backend._lib.Cryptography_HAS_EVP_PKEY_DHX == 1,
548    reason="Requires OpenSSL without EVP_PKEY_DHX (< 1.0.2)")
549@pytest.mark.requires_backend_interface(interface=DHBackend)
550class TestOpenSSLDHSerialization(object):
551
552    @pytest.mark.parametrize(
553        "vector",
554        load_vectors_from_file(
555            os.path.join("asymmetric", "DH", "RFC5114.txt"),
556            load_nist_vectors))
557    def test_dh_serialization_with_q_unsupported(self, backend, vector):
558        parameters = dh.DHParameterNumbers(int(vector["p"], 16),
559                                           int(vector["g"], 16),
560                                           int(vector["q"], 16))
561        public = dh.DHPublicNumbers(int(vector["ystatcavs"], 16), parameters)
562        private = dh.DHPrivateNumbers(int(vector["xstatcavs"], 16), public)
563        private_key = private.private_key(backend)
564        public_key = private_key.public_key()
565        with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION):
566            private_key.private_bytes(serialization.Encoding.PEM,
567                                      serialization.PrivateFormat.PKCS8,
568                                      serialization.NoEncryption())
569        with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION):
570            public_key.public_bytes(
571                serialization.Encoding.PEM,
572                serialization.PublicFormat.SubjectPublicKeyInfo)
573        with raises_unsupported_algorithm(_Reasons.UNSUPPORTED_SERIALIZATION):
574            parameters.parameters(backend).parameter_bytes(
575                serialization.Encoding.PEM,
576                serialization.ParameterFormat.PKCS3)
577
578    @pytest.mark.parametrize(
579        ("key_path", "loader_func"),
580        [
581            (
582                os.path.join("asymmetric", "DH", "dhkey_rfc5114_2.pem"),
583                serialization.load_pem_private_key,
584            ),
585            (
586                os.path.join("asymmetric", "DH", "dhkey_rfc5114_2.der"),
587                serialization.load_der_private_key,
588            )
589        ]
590    )
591    def test_private_load_dhx_unsupported(self, key_path, loader_func,
592                                          backend):
593        key_bytes = load_vectors_from_file(
594            key_path,
595            lambda pemfile: pemfile.read(), mode="rb"
596        )
597        with pytest.raises(ValueError):
598            loader_func(key_bytes, None, backend)
599
600    @pytest.mark.parametrize(
601        ("key_path", "loader_func"),
602        [
603            (
604                os.path.join("asymmetric", "DH", "dhpub_rfc5114_2.pem"),
605                serialization.load_pem_public_key,
606            ),
607            (
608                os.path.join("asymmetric", "DH", "dhpub_rfc5114_2.der"),
609                serialization.load_der_public_key,
610            )
611        ]
612    )
613    def test_public_load_dhx_unsupported(self, key_path, loader_func,
614                                         backend):
615        key_bytes = load_vectors_from_file(
616            key_path,
617            lambda pemfile: pemfile.read(), mode="rb"
618        )
619        with pytest.raises(ValueError):
620            loader_func(key_bytes, backend)
621