1# This file is dual licensed under the terms of the Apache License, Version 2# 2.0, and the BSD License. See the LICENSE file in the root of this repository 3# for complete details. 4 5from __future__ import absolute_import, division, print_function 6 7import binascii 8import itertools 9import os 10 11import pytest 12 13from cryptography.exceptions import ( 14 AlreadyFinalized, 15 AlreadyUpdated, 16 InvalidSignature, 17 InvalidTag, 18 NotYetFinalized, 19) 20from cryptography.hazmat.primitives import hashes, hmac, serialization 21from cryptography.hazmat.primitives.asymmetric import rsa 22from cryptography.hazmat.primitives.ciphers import Cipher 23from cryptography.hazmat.primitives.ciphers.modes import GCM 24from cryptography.hazmat.primitives.kdf.hkdf import HKDF, HKDFExpand 25from cryptography.hazmat.primitives.kdf.kbkdf import ( 26 CounterLocation, 27 KBKDFHMAC, 28 Mode, 29) 30from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC 31 32from ...utils import load_vectors_from_file 33 34 35def _load_all_params(path, file_names, param_loader): 36 all_params = [] 37 for file_name in file_names: 38 all_params.extend( 39 load_vectors_from_file(os.path.join(path, file_name), param_loader) 40 ) 41 return all_params 42 43 44def generate_encrypt_test( 45 param_loader, path, file_names, cipher_factory, mode_factory 46): 47 all_params = _load_all_params(path, file_names, param_loader) 48 49 @pytest.mark.parametrize("params", all_params) 50 def test_encryption(self, backend, params): 51 encrypt_test(backend, cipher_factory, mode_factory, params) 52 53 return test_encryption 54 55 56def encrypt_test(backend, cipher_factory, mode_factory, params): 57 assert backend.cipher_supported( 58 cipher_factory(**params), mode_factory(**params) 59 ) 60 61 plaintext = params["plaintext"] 62 ciphertext = params["ciphertext"] 63 cipher = Cipher( 64 cipher_factory(**params), mode_factory(**params), backend=backend 65 ) 66 encryptor = cipher.encryptor() 67 actual_ciphertext = encryptor.update(binascii.unhexlify(plaintext)) 68 actual_ciphertext += encryptor.finalize() 69 assert actual_ciphertext == binascii.unhexlify(ciphertext) 70 decryptor = cipher.decryptor() 71 actual_plaintext = decryptor.update(binascii.unhexlify(ciphertext)) 72 actual_plaintext += decryptor.finalize() 73 assert actual_plaintext == binascii.unhexlify(plaintext) 74 75 76def generate_aead_test( 77 param_loader, path, file_names, cipher_factory, mode_factory 78): 79 all_params = _load_all_params(path, file_names, param_loader) 80 81 @pytest.mark.parametrize("params", all_params) 82 def test_aead(self, backend, params): 83 aead_test(backend, cipher_factory, mode_factory, params) 84 85 return test_aead 86 87 88def aead_test(backend, cipher_factory, mode_factory, params): 89 if mode_factory is GCM and len(params["iv"]) < 16: 90 # 16 because this is hex encoded data 91 pytest.skip("Less than 64-bit IVs are no longer supported") 92 93 if ( 94 mode_factory is GCM 95 and backend._fips_enabled 96 and len(params["iv"]) != 24 97 ): 98 # Red Hat disables non-96-bit IV support as part of its FIPS 99 # patches. The check is for a byte length of 24 because the value is 100 # hex encoded. 101 pytest.skip("Non-96-bit IVs unsupported in FIPS mode.") 102 103 if params.get("pt") is not None: 104 plaintext = params["pt"] 105 ciphertext = params["ct"] 106 aad = params["aad"] 107 if params.get("fail") is True: 108 cipher = Cipher( 109 cipher_factory(binascii.unhexlify(params["key"])), 110 mode_factory( 111 binascii.unhexlify(params["iv"]), 112 binascii.unhexlify(params["tag"]), 113 len(binascii.unhexlify(params["tag"])), 114 ), 115 backend, 116 ) 117 decryptor = cipher.decryptor() 118 decryptor.authenticate_additional_data(binascii.unhexlify(aad)) 119 actual_plaintext = decryptor.update(binascii.unhexlify(ciphertext)) 120 with pytest.raises(InvalidTag): 121 decryptor.finalize() 122 else: 123 cipher = Cipher( 124 cipher_factory(binascii.unhexlify(params["key"])), 125 mode_factory(binascii.unhexlify(params["iv"]), None), 126 backend, 127 ) 128 encryptor = cipher.encryptor() 129 encryptor.authenticate_additional_data(binascii.unhexlify(aad)) 130 actual_ciphertext = encryptor.update(binascii.unhexlify(plaintext)) 131 actual_ciphertext += encryptor.finalize() 132 tag_len = len(binascii.unhexlify(params["tag"])) 133 assert binascii.hexlify(encryptor.tag[:tag_len]) == params["tag"] 134 cipher = Cipher( 135 cipher_factory(binascii.unhexlify(params["key"])), 136 mode_factory( 137 binascii.unhexlify(params["iv"]), 138 binascii.unhexlify(params["tag"]), 139 min_tag_length=tag_len, 140 ), 141 backend, 142 ) 143 decryptor = cipher.decryptor() 144 decryptor.authenticate_additional_data(binascii.unhexlify(aad)) 145 actual_plaintext = decryptor.update(binascii.unhexlify(ciphertext)) 146 actual_plaintext += decryptor.finalize() 147 assert actual_plaintext == binascii.unhexlify(plaintext) 148 149 150def generate_stream_encryption_test( 151 param_loader, path, file_names, cipher_factory 152): 153 all_params = _load_all_params(path, file_names, param_loader) 154 155 @pytest.mark.parametrize("params", all_params) 156 def test_stream_encryption(self, backend, params): 157 stream_encryption_test(backend, cipher_factory, params) 158 159 return test_stream_encryption 160 161 162def stream_encryption_test(backend, cipher_factory, params): 163 plaintext = params["plaintext"] 164 ciphertext = params["ciphertext"] 165 offset = params["offset"] 166 cipher = Cipher(cipher_factory(**params), None, backend=backend) 167 encryptor = cipher.encryptor() 168 # throw away offset bytes 169 encryptor.update(b"\x00" * int(offset)) 170 actual_ciphertext = encryptor.update(binascii.unhexlify(plaintext)) 171 actual_ciphertext += encryptor.finalize() 172 assert actual_ciphertext == binascii.unhexlify(ciphertext) 173 decryptor = cipher.decryptor() 174 decryptor.update(b"\x00" * int(offset)) 175 actual_plaintext = decryptor.update(binascii.unhexlify(ciphertext)) 176 actual_plaintext += decryptor.finalize() 177 assert actual_plaintext == binascii.unhexlify(plaintext) 178 179 180def generate_hash_test(param_loader, path, file_names, hash_cls): 181 all_params = _load_all_params(path, file_names, param_loader) 182 183 @pytest.mark.parametrize("params", all_params) 184 def test_hash(self, backend, params): 185 hash_test(backend, hash_cls, params) 186 187 return test_hash 188 189 190def hash_test(backend, algorithm, params): 191 msg, md = params 192 m = hashes.Hash(algorithm, backend=backend) 193 m.update(binascii.unhexlify(msg)) 194 expected_md = md.replace(" ", "").lower().encode("ascii") 195 assert m.finalize() == binascii.unhexlify(expected_md) 196 197 198def generate_base_hash_test(algorithm, digest_size): 199 def test_base_hash(self, backend): 200 base_hash_test(backend, algorithm, digest_size) 201 202 return test_base_hash 203 204 205def base_hash_test(backend, algorithm, digest_size): 206 m = hashes.Hash(algorithm, backend=backend) 207 assert m.algorithm.digest_size == digest_size 208 m_copy = m.copy() 209 assert m != m_copy 210 assert m._ctx != m_copy._ctx 211 212 m.update(b"abc") 213 copy = m.copy() 214 copy.update(b"123") 215 m.update(b"123") 216 assert copy.finalize() == m.finalize() 217 218 219def generate_base_hmac_test(hash_cls): 220 def test_base_hmac(self, backend): 221 base_hmac_test(backend, hash_cls) 222 223 return test_base_hmac 224 225 226def base_hmac_test(backend, algorithm): 227 key = b"ab" 228 h = hmac.HMAC(binascii.unhexlify(key), algorithm, backend=backend) 229 h_copy = h.copy() 230 assert h != h_copy 231 assert h._ctx != h_copy._ctx 232 233 234def generate_hmac_test(param_loader, path, file_names, algorithm): 235 all_params = _load_all_params(path, file_names, param_loader) 236 237 @pytest.mark.parametrize("params", all_params) 238 def test_hmac(self, backend, params): 239 hmac_test(backend, algorithm, params) 240 241 return test_hmac 242 243 244def hmac_test(backend, algorithm, params): 245 msg, md, key = params 246 h = hmac.HMAC(binascii.unhexlify(key), algorithm, backend=backend) 247 h.update(binascii.unhexlify(msg)) 248 assert h.finalize() == binascii.unhexlify(md.encode("ascii")) 249 250 251def generate_pbkdf2_test(param_loader, path, file_names, algorithm): 252 all_params = _load_all_params(path, file_names, param_loader) 253 254 @pytest.mark.parametrize("params", all_params) 255 def test_pbkdf2(self, backend, params): 256 pbkdf2_test(backend, algorithm, params) 257 258 return test_pbkdf2 259 260 261def pbkdf2_test(backend, algorithm, params): 262 # Password and salt can contain \0, which should be loaded as a null char. 263 # The NIST loader loads them as literal strings so we replace with the 264 # proper value. 265 kdf = PBKDF2HMAC( 266 algorithm, 267 int(params["length"]), 268 params["salt"], 269 int(params["iterations"]), 270 backend, 271 ) 272 derived_key = kdf.derive(params["password"]) 273 assert binascii.hexlify(derived_key) == params["derived_key"] 274 275 276def generate_aead_exception_test(cipher_factory, mode_factory): 277 def test_aead_exception(self, backend): 278 aead_exception_test(backend, cipher_factory, mode_factory) 279 280 return test_aead_exception 281 282 283def aead_exception_test(backend, cipher_factory, mode_factory): 284 cipher = Cipher( 285 cipher_factory(binascii.unhexlify(b"0" * 32)), 286 mode_factory(binascii.unhexlify(b"0" * 24)), 287 backend, 288 ) 289 encryptor = cipher.encryptor() 290 encryptor.update(b"a" * 16) 291 with pytest.raises(NotYetFinalized): 292 encryptor.tag 293 with pytest.raises(AlreadyUpdated): 294 encryptor.authenticate_additional_data(b"b" * 16) 295 encryptor.finalize() 296 with pytest.raises(AlreadyFinalized): 297 encryptor.authenticate_additional_data(b"b" * 16) 298 with pytest.raises(AlreadyFinalized): 299 encryptor.update(b"b" * 16) 300 with pytest.raises(AlreadyFinalized): 301 encryptor.finalize() 302 cipher = Cipher( 303 cipher_factory(binascii.unhexlify(b"0" * 32)), 304 mode_factory(binascii.unhexlify(b"0" * 24), b"0" * 16), 305 backend, 306 ) 307 decryptor = cipher.decryptor() 308 decryptor.update(b"a" * 16) 309 with pytest.raises(AttributeError): 310 decryptor.tag 311 312 313def generate_aead_tag_exception_test(cipher_factory, mode_factory): 314 def test_aead_tag_exception(self, backend): 315 aead_tag_exception_test(backend, cipher_factory, mode_factory) 316 317 return test_aead_tag_exception 318 319 320def aead_tag_exception_test(backend, cipher_factory, mode_factory): 321 cipher = Cipher( 322 cipher_factory(binascii.unhexlify(b"0" * 32)), 323 mode_factory(binascii.unhexlify(b"0" * 24)), 324 backend, 325 ) 326 327 with pytest.raises(ValueError): 328 mode_factory(binascii.unhexlify(b"0" * 24), b"000") 329 330 with pytest.raises(ValueError): 331 mode_factory(binascii.unhexlify(b"0" * 24), b"000000", 2) 332 333 cipher = Cipher( 334 cipher_factory(binascii.unhexlify(b"0" * 32)), 335 mode_factory(binascii.unhexlify(b"0" * 24), b"0" * 16), 336 backend, 337 ) 338 with pytest.raises(ValueError): 339 cipher.encryptor() 340 341 342def hkdf_derive_test(backend, algorithm, params): 343 hkdf = HKDF( 344 algorithm, 345 int(params["l"]), 346 salt=binascii.unhexlify(params["salt"]) or None, 347 info=binascii.unhexlify(params["info"]) or None, 348 backend=backend, 349 ) 350 351 okm = hkdf.derive(binascii.unhexlify(params["ikm"])) 352 353 assert okm == binascii.unhexlify(params["okm"]) 354 355 356def hkdf_extract_test(backend, algorithm, params): 357 hkdf = HKDF( 358 algorithm, 359 int(params["l"]), 360 salt=binascii.unhexlify(params["salt"]) or None, 361 info=binascii.unhexlify(params["info"]) or None, 362 backend=backend, 363 ) 364 365 prk = hkdf._extract(binascii.unhexlify(params["ikm"])) 366 367 assert prk == binascii.unhexlify(params["prk"]) 368 369 370def hkdf_expand_test(backend, algorithm, params): 371 hkdf = HKDFExpand( 372 algorithm, 373 int(params["l"]), 374 info=binascii.unhexlify(params["info"]) or None, 375 backend=backend, 376 ) 377 378 okm = hkdf.derive(binascii.unhexlify(params["prk"])) 379 380 assert okm == binascii.unhexlify(params["okm"]) 381 382 383def generate_hkdf_test(param_loader, path, file_names, algorithm): 384 all_params = _load_all_params(path, file_names, param_loader) 385 386 all_tests = [hkdf_extract_test, hkdf_expand_test, hkdf_derive_test] 387 388 @pytest.mark.parametrize( 389 ("params", "hkdf_test"), itertools.product(all_params, all_tests) 390 ) 391 def test_hkdf(self, backend, params, hkdf_test): 392 hkdf_test(backend, algorithm, params) 393 394 return test_hkdf 395 396 397def generate_kbkdf_counter_mode_test(param_loader, path, file_names): 398 all_params = _load_all_params(path, file_names, param_loader) 399 400 @pytest.mark.parametrize("params", all_params) 401 def test_kbkdf(self, backend, params): 402 kbkdf_counter_mode_test(backend, params) 403 404 return test_kbkdf 405 406 407def kbkdf_counter_mode_test(backend, params): 408 supported_algorithms = { 409 "hmac_sha1": hashes.SHA1, 410 "hmac_sha224": hashes.SHA224, 411 "hmac_sha256": hashes.SHA256, 412 "hmac_sha384": hashes.SHA384, 413 "hmac_sha512": hashes.SHA512, 414 } 415 416 supported_counter_locations = { 417 "before_fixed": CounterLocation.BeforeFixed, 418 "after_fixed": CounterLocation.AfterFixed, 419 } 420 421 algorithm = supported_algorithms.get(params.get("prf")) 422 if algorithm is None or not backend.hmac_supported(algorithm()): 423 pytest.skip( 424 "KBKDF does not support algorithm: {}".format(params.get("prf")) 425 ) 426 427 ctr_loc = supported_counter_locations.get(params.get("ctrlocation")) 428 if ctr_loc is None or not isinstance(ctr_loc, CounterLocation): 429 pytest.skip( 430 "Does not support counter location: {}".format( 431 params.get("ctrlocation") 432 ) 433 ) 434 435 ctrkdf = KBKDFHMAC( 436 algorithm(), 437 Mode.CounterMode, 438 params["l"] // 8, 439 params["rlen"] // 8, 440 None, 441 ctr_loc, 442 None, 443 None, 444 binascii.unhexlify(params["fixedinputdata"]), 445 backend=backend, 446 ) 447 448 ko = ctrkdf.derive(binascii.unhexlify(params["ki"])) 449 assert binascii.hexlify(ko) == params["ko"] 450 451 452def generate_rsa_verification_test( 453 param_loader, path, file_names, hash_alg, pad_factory 454): 455 all_params = _load_all_params(path, file_names, param_loader) 456 all_params = [ 457 i for i in all_params if i["algorithm"] == hash_alg.name.upper() 458 ] 459 460 @pytest.mark.parametrize("params", all_params) 461 def test_rsa_verification(self, backend, params): 462 rsa_verification_test(backend, params, hash_alg, pad_factory) 463 464 return test_rsa_verification 465 466 467def rsa_verification_test(backend, params, hash_alg, pad_factory): 468 public_numbers = rsa.RSAPublicNumbers( 469 e=params["public_exponent"], n=params["modulus"] 470 ) 471 public_key = public_numbers.public_key(backend) 472 pad = pad_factory(params, hash_alg) 473 signature = binascii.unhexlify(params["s"]) 474 msg = binascii.unhexlify(params["msg"]) 475 if params["fail"]: 476 with pytest.raises(InvalidSignature): 477 public_key.verify(signature, msg, pad, hash_alg) 478 else: 479 public_key.verify(signature, msg, pad, hash_alg) 480 481 482def _check_rsa_private_numbers(skey): 483 assert skey 484 pkey = skey.public_numbers 485 assert pkey 486 assert pkey.e 487 assert pkey.n 488 assert skey.d 489 assert skey.p * skey.q == pkey.n 490 assert skey.dmp1 == rsa.rsa_crt_dmp1(skey.d, skey.p) 491 assert skey.dmq1 == rsa.rsa_crt_dmq1(skey.d, skey.q) 492 assert skey.iqmp == rsa.rsa_crt_iqmp(skey.p, skey.q) 493 494 495def _check_dsa_private_numbers(skey): 496 assert skey 497 pkey = skey.public_numbers 498 params = pkey.parameter_numbers 499 assert pow(params.g, skey.x, params.p) == pkey.y 500 501 502def skip_fips_traditional_openssl(backend, fmt): 503 if ( 504 fmt is serialization.PrivateFormat.TraditionalOpenSSL 505 and backend._fips_enabled 506 ): 507 pytest.skip( 508 "Traditional OpenSSL key format is not supported in FIPS mode." 509 ) 510