1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) 2008 Arnaud Ebalard <arnaud.ebalard@eads.net> 5# <arno@natisbad.org> 6# 2015, 2016, 2017 Maxence Tury <maxence.tury@ssi.gouv.fr> 7 8""" 9High-level methods for PKI objects (X.509 certificates, CRLs, asymmetric keys). 10Supports both RSA and ECDSA objects. 11 12The classes below are wrappers for the ASN.1 objects defined in x509.py. 13By collecting their attributes, we bypass the ASN.1 structure, hence 14there is no direct method for exporting a new full DER-encoded version 15of a Cert instance after its serial has been modified (for example). 16If you need to modify an import, just use the corresponding ASN1_Packet. 17 18For instance, here is what you could do in order to modify the serial of 19'cert' and then resign it with whatever 'key':: 20 21 f = open('cert.der') 22 c = X509_Cert(f.read()) 23 c.tbsCertificate.serialNumber = 0x4B1D 24 k = PrivKey('key.pem') 25 new_x509_cert = k.resignCert(c) 26 27No need for obnoxious openssl tweaking anymore. :) 28""" 29 30import base64 31import os 32import time 33 34from scapy.config import conf, crypto_validator 35from scapy.error import warning 36from scapy.utils import binrepr 37from scapy.asn1.asn1 import ASN1_BIT_STRING 38from scapy.asn1.mib import hash_by_oid 39from scapy.layers.x509 import ( 40 ECDSAPrivateKey_OpenSSL, 41 ECDSAPrivateKey, 42 ECDSAPublicKey, 43 EdDSAPublicKey, 44 EdDSAPrivateKey, 45 RSAPrivateKey_OpenSSL, 46 RSAPrivateKey, 47 RSAPublicKey, 48 X509_Cert, 49 X509_CRL, 50 X509_SubjectPublicKeyInfo, 51) 52from scapy.layers.tls.crypto.pkcs1 import pkcs_os2ip, _get_hash, \ 53 _EncryptAndVerifyRSA, _DecryptAndSignRSA 54from scapy.compat import raw, bytes_encode 55 56if conf.crypto_valid: 57 from cryptography.exceptions import InvalidSignature 58 from cryptography.hazmat.backends import default_backend 59 from cryptography.hazmat.primitives import serialization 60 from cryptography.hazmat.primitives.asymmetric import rsa, ec, x25519 61 62 # cryptography raised the minimum RSA key length to 1024 in 43.0+ 63 # https://github.com/pyca/cryptography/pull/10278 64 # but we need still 512 for EXPORT40 ciphers (yes EXPORT is terrible) 65 # https://datatracker.ietf.org/doc/html/rfc2246#autoid-66 66 # The following detects the change and hacks around it using the backend 67 68 try: 69 rsa.generate_private_key(public_exponent=65537, key_size=512) 70 _RSA_512_SUPPORTED = True 71 except ValueError: 72 # cryptography > 43.0 73 _RSA_512_SUPPORTED = False 74 from cryptography.hazmat.primitives.asymmetric.rsa import rust_openssl 75 76 77# Maximum allowed size in bytes for a certificate file, to avoid 78# loading huge file when importing a cert 79_MAX_KEY_SIZE = 50 * 1024 80_MAX_CERT_SIZE = 50 * 1024 81_MAX_CRL_SIZE = 10 * 1024 * 1024 # some are that big 82 83 84##################################################################### 85# Some helpers 86##################################################################### 87 88@conf.commands.register 89def der2pem(der_string, obj="UNKNOWN"): 90 """Convert DER octet string to PEM format (with optional header)""" 91 # Encode a byte string in PEM format. Header advertises <obj> type. 92 pem_string = ("-----BEGIN %s-----\n" % obj).encode() 93 base64_string = base64.b64encode(der_string) 94 chunks = [base64_string[i:i + 64] for i in range(0, len(base64_string), 64)] # noqa: E501 95 pem_string += b'\n'.join(chunks) 96 pem_string += ("\n-----END %s-----\n" % obj).encode() 97 return pem_string 98 99 100@conf.commands.register 101def pem2der(pem_string): 102 """Convert PEM string to DER format""" 103 # Encode all lines between the first '-----\n' and the 2nd-to-last '-----'. 104 pem_string = pem_string.replace(b"\r", b"") 105 first_idx = pem_string.find(b"-----\n") + 6 106 if pem_string.find(b"-----BEGIN", first_idx) != -1: 107 raise Exception("pem2der() expects only one PEM-encoded object") 108 last_idx = pem_string.rfind(b"-----", 0, pem_string.rfind(b"-----")) 109 base64_string = pem_string[first_idx:last_idx] 110 base64_string.replace(b"\n", b"") 111 der_string = base64.b64decode(base64_string) 112 return der_string 113 114 115def split_pem(s): 116 """ 117 Split PEM objects. Useful to process concatenated certificates. 118 """ 119 pem_strings = [] 120 while s != b"": 121 start_idx = s.find(b"-----BEGIN") 122 if start_idx == -1: 123 break 124 end_idx = s.find(b"-----END") 125 if end_idx == -1: 126 raise Exception("Invalid PEM object (missing END tag)") 127 end_idx = s.find(b"\n", end_idx) + 1 128 if end_idx == 0: 129 # There is no final \n 130 end_idx = len(s) 131 pem_strings.append(s[start_idx:end_idx]) 132 s = s[end_idx:] 133 return pem_strings 134 135 136class _PKIObj(object): 137 def __init__(self, frmt, der, pem): 138 # Note that changing attributes of the _PKIObj does not update these 139 # values (e.g. modifying k.modulus does not change k.der). 140 # XXX use __setattr__ for this 141 self.frmt = frmt 142 self.der = der 143 self.pem = pem 144 145 def __str__(self): 146 return self.der 147 148 149class _PKIObjMaker(type): 150 def __call__(cls, obj_path, obj_max_size, pem_marker=None): 151 # This enables transparent DER and PEM-encoded data imports. 152 # Note that when importing a PEM file with multiple objects (like ECDSA 153 # private keys output by openssl), it will concatenate every object in 154 # order to create a 'der' attribute. When converting a 'multi' DER file 155 # into a PEM file, though, the PEM attribute will not be valid, 156 # because we do not try to identify the class of each object. 157 error_msg = "Unable to import data" 158 159 if obj_path is None: 160 raise Exception(error_msg) 161 obj_path = bytes_encode(obj_path) 162 163 if (b'\x00' not in obj_path) and os.path.isfile(obj_path): 164 _size = os.path.getsize(obj_path) 165 if _size > obj_max_size: 166 raise Exception(error_msg) 167 try: 168 with open(obj_path, "rb") as f: 169 _raw = f.read() 170 except Exception: 171 raise Exception(error_msg) 172 else: 173 _raw = obj_path 174 175 try: 176 if b"-----BEGIN" in _raw: 177 frmt = "PEM" 178 pem = _raw 179 der_list = split_pem(_raw) 180 der = b''.join(map(pem2der, der_list)) 181 else: 182 frmt = "DER" 183 der = _raw 184 pem = "" 185 if pem_marker is not None: 186 pem = der2pem(_raw, pem_marker) 187 # type identification may be needed for pem_marker 188 # in such case, the pem attribute has to be updated 189 except Exception: 190 raise Exception(error_msg) 191 192 p = _PKIObj(frmt, der, pem) 193 return p 194 195 196##################################################################### 197# PKI objects wrappers 198##################################################################### 199 200############### 201# Public Keys # 202############### 203 204class _PubKeyFactory(_PKIObjMaker): 205 """ 206 Metaclass for PubKey creation. 207 It casts the appropriate class on the fly, then fills in 208 the appropriate attributes with import_from_asn1pkt() submethod. 209 """ 210 def __call__(cls, key_path=None): 211 212 if key_path is None: 213 obj = type.__call__(cls) 214 if cls is PubKey: 215 cls = PubKeyRSA 216 obj.__class__ = cls 217 obj.frmt = "original" 218 obj.fill_and_store() 219 return obj 220 221 # This deals with the rare RSA 'kx export' call. 222 if isinstance(key_path, tuple): 223 obj = type.__call__(cls) 224 obj.__class__ = PubKeyRSA 225 obj.frmt = "tuple" 226 obj.import_from_tuple(key_path) 227 return obj 228 229 # Now for the usual calls, key_path may be the path to either: 230 # _an X509_SubjectPublicKeyInfo, as processed by openssl; 231 # _an RSAPublicKey; 232 # _an ECDSAPublicKey; 233 # _an EdDSAPublicKey. 234 obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE) 235 try: 236 spki = X509_SubjectPublicKeyInfo(obj.der) 237 pubkey = spki.subjectPublicKey 238 if isinstance(pubkey, RSAPublicKey): 239 obj.__class__ = PubKeyRSA 240 obj.import_from_asn1pkt(pubkey) 241 elif isinstance(pubkey, ECDSAPublicKey): 242 obj.__class__ = PubKeyECDSA 243 obj.import_from_der(obj.der) 244 elif isinstance(pubkey, EdDSAPublicKey): 245 obj.__class__ = PubKeyEdDSA 246 obj.import_from_der(obj.der) 247 else: 248 raise 249 marker = b"PUBLIC KEY" 250 except Exception: 251 try: 252 pubkey = RSAPublicKey(obj.der) 253 obj.__class__ = PubKeyRSA 254 obj.import_from_asn1pkt(pubkey) 255 marker = b"RSA PUBLIC KEY" 256 except Exception: 257 # We cannot import an ECDSA public key without curve knowledge 258 if conf.debug_dissector: 259 raise 260 raise Exception("Unable to import public key") 261 262 if obj.frmt == "DER": 263 obj.pem = der2pem(obj.der, marker) 264 return obj 265 266 267class PubKey(metaclass=_PubKeyFactory): 268 """ 269 Parent class for both PubKeyRSA and PubKeyECDSA. 270 Provides a common verifyCert() method. 271 """ 272 273 def verifyCert(self, cert): 274 """ Verifies either a Cert or an X509_Cert. """ 275 tbsCert = cert.tbsCertificate 276 sigAlg = tbsCert.signature 277 h = hash_by_oid[sigAlg.algorithm.val] 278 sigVal = raw(cert.signatureValue) 279 return self.verify(raw(tbsCert), sigVal, h=h, t='pkcs') 280 281 282class PubKeyRSA(PubKey, _EncryptAndVerifyRSA): 283 """ 284 Wrapper for RSA keys based on _EncryptAndVerifyRSA from crypto/pkcs1.py 285 Use the 'key' attribute to access original object. 286 """ 287 @crypto_validator 288 def fill_and_store(self, modulus=None, modulusLen=None, pubExp=None): 289 pubExp = pubExp or 65537 290 if not modulus: 291 real_modulusLen = modulusLen or 2048 292 if real_modulusLen < 1024 and not _RSA_512_SUPPORTED: 293 # cryptography > 43.0 compatibility 294 private_key = rust_openssl.rsa.generate_private_key( 295 public_exponent=pubExp, 296 key_size=real_modulusLen, 297 ) 298 else: 299 private_key = rsa.generate_private_key( 300 public_exponent=pubExp, 301 key_size=real_modulusLen, 302 backend=default_backend(), 303 ) 304 self.pubkey = private_key.public_key() 305 else: 306 real_modulusLen = len(binrepr(modulus)) 307 if modulusLen and real_modulusLen != modulusLen: 308 warning("modulus and modulusLen do not match!") 309 pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp) 310 self.pubkey = pubNum.public_key(default_backend()) 311 # Lines below are only useful for the legacy part of pkcs1.py 312 pubNum = self.pubkey.public_numbers() 313 self._modulusLen = real_modulusLen 314 self._modulus = pubNum.n 315 self._pubExp = pubNum.e 316 317 @crypto_validator 318 def import_from_tuple(self, tup): 319 # this is rarely used 320 e, m, mLen = tup 321 if isinstance(m, bytes): 322 m = pkcs_os2ip(m) 323 if isinstance(e, bytes): 324 e = pkcs_os2ip(e) 325 self.fill_and_store(modulus=m, pubExp=e) 326 self.pem = self.pubkey.public_bytes( 327 encoding=serialization.Encoding.PEM, 328 format=serialization.PublicFormat.SubjectPublicKeyInfo) 329 self.der = pem2der(self.pem) 330 331 def import_from_asn1pkt(self, pubkey): 332 modulus = pubkey.modulus.val 333 pubExp = pubkey.publicExponent.val 334 self.fill_and_store(modulus=modulus, pubExp=pubExp) 335 336 def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None): 337 # no ECDSA encryption support, hence no ECDSA specific keywords here 338 return _EncryptAndVerifyRSA.encrypt(self, msg, t=t, h=h, mgf=mgf, L=L) 339 340 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): 341 return _EncryptAndVerifyRSA.verify( 342 self, msg, sig, t=t, h=h, mgf=mgf, L=L) 343 344 345class PubKeyECDSA(PubKey): 346 """ 347 Wrapper for ECDSA keys based on the cryptography library. 348 Use the 'key' attribute to access original object. 349 """ 350 @crypto_validator 351 def fill_and_store(self, curve=None): 352 curve = curve or ec.SECP256R1 353 private_key = ec.generate_private_key(curve(), default_backend()) 354 self.pubkey = private_key.public_key() 355 356 @crypto_validator 357 def import_from_der(self, pubkey): 358 # No lib support for explicit curves nor compressed points. 359 self.pubkey = serialization.load_der_public_key( 360 pubkey, 361 backend=default_backend(), 362 ) 363 364 def encrypt(self, msg, h="sha256", **kwargs): 365 raise Exception("No ECDSA encryption support") 366 367 @crypto_validator 368 def verify(self, msg, sig, h="sha256", **kwargs): 369 # 'sig' should be a DER-encoded signature, as per RFC 3279 370 try: 371 self.pubkey.verify(sig, msg, ec.ECDSA(_get_hash(h))) 372 return True 373 except InvalidSignature: 374 return False 375 376 377class PubKeyEdDSA(PubKey): 378 """ 379 Wrapper for EdDSA keys based on the cryptography library. 380 Use the 'key' attribute to access original object. 381 """ 382 @crypto_validator 383 def fill_and_store(self, curve=None): 384 curve = curve or x25519.X25519PrivateKey 385 private_key = curve.generate() 386 self.pubkey = private_key.public_key() 387 388 @crypto_validator 389 def import_from_der(self, pubkey): 390 self.pubkey = serialization.load_der_public_key( 391 pubkey, 392 backend=default_backend(), 393 ) 394 395 def encrypt(self, msg, **kwargs): 396 raise Exception("No EdDSA encryption support") 397 398 @crypto_validator 399 def verify(self, msg, sig, **kwargs): 400 # 'sig' should be a DER-encoded signature, as per RFC 3279 401 try: 402 self.pubkey.verify(sig, msg) 403 return True 404 except InvalidSignature: 405 return False 406 407 408################ 409# Private Keys # 410################ 411 412class _PrivKeyFactory(_PKIObjMaker): 413 """ 414 Metaclass for PrivKey creation. 415 It casts the appropriate class on the fly, then fills in 416 the appropriate attributes with import_from_asn1pkt() submethod. 417 """ 418 def __call__(cls, key_path=None): 419 """ 420 key_path may be the path to either: 421 _an RSAPrivateKey_OpenSSL (as generated by openssl); 422 _an ECDSAPrivateKey_OpenSSL (as generated by openssl); 423 _an RSAPrivateKey; 424 _an ECDSAPrivateKey. 425 """ 426 if key_path is None: 427 obj = type.__call__(cls) 428 if cls is PrivKey: 429 cls = PrivKeyECDSA 430 obj.__class__ = cls 431 obj.frmt = "original" 432 obj.fill_and_store() 433 return obj 434 435 obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE) 436 multiPEM = False 437 try: 438 privkey = RSAPrivateKey_OpenSSL(obj.der) 439 privkey = privkey.privateKey 440 obj.__class__ = PrivKeyRSA 441 marker = b"PRIVATE KEY" 442 except Exception: 443 try: 444 privkey = ECDSAPrivateKey_OpenSSL(obj.der) 445 privkey = privkey.privateKey 446 obj.__class__ = PrivKeyECDSA 447 marker = b"EC PRIVATE KEY" 448 multiPEM = True 449 except Exception: 450 try: 451 privkey = RSAPrivateKey(obj.der) 452 obj.__class__ = PrivKeyRSA 453 marker = b"RSA PRIVATE KEY" 454 except Exception: 455 try: 456 privkey = ECDSAPrivateKey(obj.der) 457 obj.__class__ = PrivKeyECDSA 458 marker = b"EC PRIVATE KEY" 459 except Exception: 460 try: 461 privkey = EdDSAPrivateKey(obj.der) 462 obj.__class__ = PrivKeyEdDSA 463 marker = b"PRIVATE KEY" 464 except Exception: 465 raise Exception("Unable to import private key") 466 try: 467 obj.import_from_asn1pkt(privkey) 468 except ImportError: 469 pass 470 471 if obj.frmt == "DER": 472 if multiPEM: 473 # this does not restore the EC PARAMETERS header 474 obj.pem = der2pem(raw(privkey), marker) 475 else: 476 obj.pem = der2pem(obj.der, marker) 477 return obj 478 479 480class _Raw_ASN1_BIT_STRING(ASN1_BIT_STRING): 481 """A ASN1_BIT_STRING that ignores BER encoding""" 482 def __bytes__(self): 483 return self.val_readable 484 __str__ = __bytes__ 485 486 487class PrivKey(metaclass=_PrivKeyFactory): 488 """ 489 Parent class for both PrivKeyRSA and PrivKeyECDSA. 490 Provides common signTBSCert() and resignCert() methods. 491 """ 492 493 def signTBSCert(self, tbsCert, h="sha256"): 494 """ 495 Note that this will always copy the signature field from the 496 tbsCertificate into the signatureAlgorithm field of the result, 497 regardless of the coherence between its contents (which might 498 indicate ecdsa-with-SHA512) and the result (e.g. RSA signing MD2). 499 500 There is a small inheritance trick for the computation of sigVal 501 below: in order to use a sign() method which would apply 502 to both PrivKeyRSA and PrivKeyECDSA, the sign() methods of the 503 subclasses accept any argument, be it from the RSA or ECDSA world, 504 and then they keep the ones they're interested in. 505 Here, t will be passed eventually to pkcs1._DecryptAndSignRSA.sign(). 506 """ 507 sigAlg = tbsCert.signature 508 h = h or hash_by_oid[sigAlg.algorithm.val] 509 sigVal = self.sign(raw(tbsCert), h=h, t='pkcs') 510 c = X509_Cert() 511 c.tbsCertificate = tbsCert 512 c.signatureAlgorithm = sigAlg 513 c.signatureValue = _Raw_ASN1_BIT_STRING(sigVal, readable=True) 514 return c 515 516 def resignCert(self, cert): 517 """ Rewrite the signature of either a Cert or an X509_Cert. """ 518 return self.signTBSCert(cert.tbsCertificate, h=None) 519 520 def verifyCert(self, cert): 521 """ Verifies either a Cert or an X509_Cert. """ 522 tbsCert = cert.tbsCertificate 523 sigAlg = tbsCert.signature 524 h = hash_by_oid[sigAlg.algorithm.val] 525 sigVal = raw(cert.signatureValue) 526 return self.verify(raw(tbsCert), sigVal, h=h, t='pkcs') 527 528 529class PrivKeyRSA(PrivKey, _EncryptAndVerifyRSA, _DecryptAndSignRSA): 530 """ 531 Wrapper for RSA keys based on _DecryptAndSignRSA from crypto/pkcs1.py 532 Use the 'key' attribute to access original object. 533 """ 534 @crypto_validator 535 def fill_and_store(self, modulus=None, modulusLen=None, pubExp=None, 536 prime1=None, prime2=None, coefficient=None, 537 exponent1=None, exponent2=None, privExp=None): 538 pubExp = pubExp or 65537 539 if None in [modulus, prime1, prime2, coefficient, privExp, 540 exponent1, exponent2]: 541 # note that the library requires every parameter 542 # in order to call RSAPrivateNumbers(...) 543 # if one of these is missing, we generate a whole new key 544 real_modulusLen = modulusLen or 2048 545 if real_modulusLen < 1024 and not _RSA_512_SUPPORTED: 546 # cryptography > 43.0 compatibility 547 self.key = rust_openssl.rsa.generate_private_key( 548 public_exponent=pubExp, 549 key_size=real_modulusLen, 550 ) 551 else: 552 self.key = rsa.generate_private_key( 553 public_exponent=pubExp, 554 key_size=real_modulusLen, 555 backend=default_backend(), 556 ) 557 self.pubkey = self.key.public_key() 558 else: 559 real_modulusLen = len(binrepr(modulus)) 560 if modulusLen and real_modulusLen != modulusLen: 561 warning("modulus and modulusLen do not match!") 562 pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp) 563 privNum = rsa.RSAPrivateNumbers(p=prime1, q=prime2, 564 dmp1=exponent1, dmq1=exponent2, 565 iqmp=coefficient, d=privExp, 566 public_numbers=pubNum) 567 self.key = privNum.private_key(default_backend()) 568 self.pubkey = self.key.public_key() 569 570 # Lines below are only useful for the legacy part of pkcs1.py 571 pubNum = self.pubkey.public_numbers() 572 self._modulusLen = real_modulusLen 573 self._modulus = pubNum.n 574 self._pubExp = pubNum.e 575 576 def import_from_asn1pkt(self, privkey): 577 modulus = privkey.modulus.val 578 pubExp = privkey.publicExponent.val 579 privExp = privkey.privateExponent.val 580 prime1 = privkey.prime1.val 581 prime2 = privkey.prime2.val 582 exponent1 = privkey.exponent1.val 583 exponent2 = privkey.exponent2.val 584 coefficient = privkey.coefficient.val 585 self.fill_and_store(modulus=modulus, pubExp=pubExp, 586 privExp=privExp, prime1=prime1, prime2=prime2, 587 exponent1=exponent1, exponent2=exponent2, 588 coefficient=coefficient) 589 590 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): 591 # Let's copy this from PubKeyRSA instead of adding another baseclass :) 592 return _EncryptAndVerifyRSA.verify( 593 self, msg, sig, t=t, h=h, mgf=mgf, L=L) 594 595 def sign(self, data, t="pkcs", h="sha256", mgf=None, L=None): 596 return _DecryptAndSignRSA.sign(self, data, t=t, h=h, mgf=mgf, L=L) 597 598 599class PrivKeyECDSA(PrivKey): 600 """ 601 Wrapper for ECDSA keys based on SigningKey from ecdsa library. 602 Use the 'key' attribute to access original object. 603 """ 604 @crypto_validator 605 def fill_and_store(self, curve=None): 606 curve = curve or ec.SECP256R1 607 self.key = ec.generate_private_key(curve(), default_backend()) 608 self.pubkey = self.key.public_key() 609 610 @crypto_validator 611 def import_from_asn1pkt(self, privkey): 612 self.key = serialization.load_der_private_key(raw(privkey), None, 613 backend=default_backend()) # noqa: E501 614 self.pubkey = self.key.public_key() 615 616 @crypto_validator 617 def verify(self, msg, sig, h="sha256", **kwargs): 618 # 'sig' should be a DER-encoded signature, as per RFC 3279 619 try: 620 self.pubkey.verify(sig, msg, ec.ECDSA(_get_hash(h))) 621 return True 622 except InvalidSignature: 623 return False 624 625 @crypto_validator 626 def sign(self, data, h="sha256", **kwargs): 627 return self.key.sign(data, ec.ECDSA(_get_hash(h))) 628 629 630class PrivKeyEdDSA(PrivKey): 631 """ 632 Wrapper for EdDSA keys 633 Use the 'key' attribute to access original object. 634 """ 635 @crypto_validator 636 def fill_and_store(self, curve=None): 637 curve = curve or x25519.X25519PrivateKey 638 self.key = curve.generate() 639 self.pubkey = self.key.public_key() 640 641 @crypto_validator 642 def import_from_asn1pkt(self, privkey): 643 self.key = serialization.load_der_private_key(raw(privkey), None, 644 backend=default_backend()) # noqa: E501 645 self.pubkey = self.key.public_key() 646 647 @crypto_validator 648 def verify(self, msg, sig, **kwargs): 649 # 'sig' should be a DER-encoded signature, as per RFC 3279 650 try: 651 self.pubkey.verify(sig, msg) 652 return True 653 except InvalidSignature: 654 return False 655 656 @crypto_validator 657 def sign(self, data, **kwargs): 658 return self.key.sign(data) 659 660 661################ 662# Certificates # 663################ 664 665class _CertMaker(_PKIObjMaker): 666 """ 667 Metaclass for Cert creation. It is not necessary as it was for the keys, 668 but we reuse the model instead of creating redundant constructors. 669 """ 670 def __call__(cls, cert_path): 671 obj = _PKIObjMaker.__call__(cls, cert_path, 672 _MAX_CERT_SIZE, "CERTIFICATE") 673 obj.__class__ = Cert 674 try: 675 cert = X509_Cert(obj.der) 676 except Exception: 677 if conf.debug_dissector: 678 raise 679 raise Exception("Unable to import certificate") 680 obj.import_from_asn1pkt(cert) 681 return obj 682 683 684class Cert(metaclass=_CertMaker): 685 """ 686 Wrapper for the X509_Cert from layers/x509.py. 687 Use the 'x509Cert' attribute to access original object. 688 """ 689 690 def import_from_asn1pkt(self, cert): 691 error_msg = "Unable to import certificate" 692 693 self.x509Cert = cert 694 695 tbsCert = cert.tbsCertificate 696 self.tbsCertificate = tbsCert 697 698 if tbsCert.version: 699 self.version = tbsCert.version.val + 1 700 else: 701 self.version = 1 702 self.serial = tbsCert.serialNumber.val 703 self.sigAlg = tbsCert.signature.algorithm.oidname 704 self.issuer = tbsCert.get_issuer() 705 self.issuer_str = tbsCert.get_issuer_str() 706 self.issuer_hash = hash(self.issuer_str) 707 self.subject = tbsCert.get_subject() 708 self.subject_str = tbsCert.get_subject_str() 709 self.subject_hash = hash(self.subject_str) 710 self.authorityKeyID = None 711 712 self.notBefore_str = tbsCert.validity.not_before.pretty_time 713 try: 714 self.notBefore = tbsCert.validity.not_before.datetime.timetuple() 715 except ValueError: 716 raise Exception(error_msg) 717 self.notBefore_str_simple = time.strftime("%x", self.notBefore) 718 719 self.notAfter_str = tbsCert.validity.not_after.pretty_time 720 try: 721 self.notAfter = tbsCert.validity.not_after.datetime.timetuple() 722 except ValueError: 723 raise Exception(error_msg) 724 self.notAfter_str_simple = time.strftime("%x", self.notAfter) 725 726 self.pubKey = PubKey(raw(tbsCert.subjectPublicKeyInfo)) 727 728 if tbsCert.extensions: 729 for extn in tbsCert.extensions: 730 if extn.extnID.oidname == "basicConstraints": 731 self.cA = False 732 if extn.extnValue.cA: 733 self.cA = not (extn.extnValue.cA.val == 0) 734 elif extn.extnID.oidname == "keyUsage": 735 self.keyUsage = extn.extnValue.get_keyUsage() 736 elif extn.extnID.oidname == "extKeyUsage": 737 self.extKeyUsage = extn.extnValue.get_extendedKeyUsage() 738 elif extn.extnID.oidname == "authorityKeyIdentifier": 739 self.authorityKeyID = extn.extnValue.keyIdentifier.val 740 741 self.signatureValue = raw(cert.signatureValue) 742 self.signatureLen = len(self.signatureValue) 743 744 def isIssuerCert(self, other): 745 """ 746 True if 'other' issued 'self', i.e.: 747 - self.issuer == other.subject 748 - self is signed by other 749 """ 750 if self.issuer_hash != other.subject_hash: 751 return False 752 return other.pubKey.verifyCert(self) 753 754 def isSelfSigned(self): 755 """ 756 Return True if the certificate is self-signed: 757 - issuer and subject are the same 758 - the signature of the certificate is valid. 759 """ 760 if self.issuer_hash == self.subject_hash: 761 return self.isIssuerCert(self) 762 return False 763 764 def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None): 765 # no ECDSA *encryption* support, hence only RSA specific keywords here 766 return self.pubKey.encrypt(msg, t=t, h=h, mgf=mgf, L=L) 767 768 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): 769 return self.pubKey.verify(msg, sig, t=t, h=h, mgf=mgf, L=L) 770 771 def remainingDays(self, now=None): 772 """ 773 Based on the value of notAfter field, returns the number of 774 days the certificate will still be valid. The date used for the 775 comparison is the current and local date, as returned by 776 time.localtime(), except if 'now' argument is provided another 777 one. 'now' argument can be given as either a time tuple or a string 778 representing the date. Accepted format for the string version 779 are: 780 781 - '%b %d %H:%M:%S %Y %Z' e.g. 'Jan 30 07:38:59 2008 GMT' 782 - '%m/%d/%y' e.g. '01/30/08' (less precise) 783 784 If the certificate is no more valid at the date considered, then 785 a negative value is returned representing the number of days 786 since it has expired. 787 788 The number of days is returned as a float to deal with the unlikely 789 case of certificates that are still just valid. 790 """ 791 if now is None: 792 now = time.localtime() 793 elif isinstance(now, str): 794 try: 795 if '/' in now: 796 now = time.strptime(now, '%m/%d/%y') 797 else: 798 now = time.strptime(now, '%b %d %H:%M:%S %Y %Z') 799 except Exception: 800 warning("Bad time string provided, will use localtime() instead.") # noqa: E501 801 now = time.localtime() 802 803 now = time.mktime(now) 804 nft = time.mktime(self.notAfter) 805 diff = (nft - now) / (24. * 3600) 806 return diff 807 808 def isRevoked(self, crl_list): 809 """ 810 Given a list of trusted CRL (their signature has already been 811 verified with trusted anchors), this function returns True if 812 the certificate is marked as revoked by one of those CRL. 813 814 Note that if the Certificate was on hold in a previous CRL and 815 is now valid again in a new CRL and bot are in the list, it 816 will be considered revoked: this is because _all_ CRLs are 817 checked (not only the freshest) and revocation status is not 818 handled. 819 820 Also note that the check on the issuer is performed on the 821 Authority Key Identifier if available in _both_ the CRL and the 822 Cert. Otherwise, the issuers are simply compared. 823 """ 824 for c in crl_list: 825 if (self.authorityKeyID is not None and 826 c.authorityKeyID is not None and 827 self.authorityKeyID == c.authorityKeyID): 828 return self.serial in (x[0] for x in c.revoked_cert_serials) 829 elif self.issuer == c.issuer: 830 return self.serial in (x[0] for x in c.revoked_cert_serials) 831 return False 832 833 def export(self, filename, fmt="DER"): 834 """ 835 Export certificate in 'fmt' format (DER or PEM) to file 'filename' 836 """ 837 with open(filename, "wb") as f: 838 if fmt == "DER": 839 f.write(self.der) 840 elif fmt == "PEM": 841 f.write(self.pem) 842 843 def show(self): 844 print("Serial: %s" % self.serial) 845 print("Issuer: " + self.issuer_str) 846 print("Subject: " + self.subject_str) 847 print("Validity: %s to %s" % (self.notBefore_str, self.notAfter_str)) 848 849 def __repr__(self): 850 return "[X.509 Cert. Subject:%s, Issuer:%s]" % (self.subject_str, self.issuer_str) # noqa: E501 851 852 853################################ 854# Certificate Revocation Lists # 855################################ 856 857class _CRLMaker(_PKIObjMaker): 858 """ 859 Metaclass for CRL creation. It is not necessary as it was for the keys, 860 but we reuse the model instead of creating redundant constructors. 861 """ 862 def __call__(cls, cert_path): 863 obj = _PKIObjMaker.__call__(cls, cert_path, _MAX_CRL_SIZE, "X509 CRL") 864 obj.__class__ = CRL 865 try: 866 crl = X509_CRL(obj.der) 867 except Exception: 868 raise Exception("Unable to import CRL") 869 obj.import_from_asn1pkt(crl) 870 return obj 871 872 873class CRL(metaclass=_CRLMaker): 874 """ 875 Wrapper for the X509_CRL from layers/x509.py. 876 Use the 'x509CRL' attribute to access original object. 877 """ 878 879 def import_from_asn1pkt(self, crl): 880 error_msg = "Unable to import CRL" 881 882 self.x509CRL = crl 883 884 tbsCertList = crl.tbsCertList 885 self.tbsCertList = raw(tbsCertList) 886 887 if tbsCertList.version: 888 self.version = tbsCertList.version.val + 1 889 else: 890 self.version = 1 891 self.sigAlg = tbsCertList.signature.algorithm.oidname 892 self.issuer = tbsCertList.get_issuer() 893 self.issuer_str = tbsCertList.get_issuer_str() 894 self.issuer_hash = hash(self.issuer_str) 895 896 self.lastUpdate_str = tbsCertList.this_update.pretty_time 897 lastUpdate = tbsCertList.this_update.val 898 if lastUpdate[-1] == "Z": 899 lastUpdate = lastUpdate[:-1] 900 try: 901 self.lastUpdate = time.strptime(lastUpdate, "%y%m%d%H%M%S") 902 except Exception: 903 raise Exception(error_msg) 904 self.lastUpdate_str_simple = time.strftime("%x", self.lastUpdate) 905 906 self.nextUpdate = None 907 self.nextUpdate_str_simple = None 908 if tbsCertList.next_update: 909 self.nextUpdate_str = tbsCertList.next_update.pretty_time 910 nextUpdate = tbsCertList.next_update.val 911 if nextUpdate[-1] == "Z": 912 nextUpdate = nextUpdate[:-1] 913 try: 914 self.nextUpdate = time.strptime(nextUpdate, "%y%m%d%H%M%S") 915 except Exception: 916 raise Exception(error_msg) 917 self.nextUpdate_str_simple = time.strftime("%x", self.nextUpdate) 918 919 if tbsCertList.crlExtensions: 920 for extension in tbsCertList.crlExtensions: 921 if extension.extnID.oidname == "cRLNumber": 922 self.number = extension.extnValue.cRLNumber.val 923 924 revoked = [] 925 if tbsCertList.revokedCertificates: 926 for cert in tbsCertList.revokedCertificates: 927 serial = cert.serialNumber.val 928 date = cert.revocationDate.val 929 if date[-1] == "Z": 930 date = date[:-1] 931 try: 932 time.strptime(date, "%y%m%d%H%M%S") 933 except Exception: 934 raise Exception(error_msg) 935 revoked.append((serial, date)) 936 self.revoked_cert_serials = revoked 937 938 self.signatureValue = raw(crl.signatureValue) 939 self.signatureLen = len(self.signatureValue) 940 941 def isIssuerCert(self, other): 942 # This is exactly the same thing as in Cert method. 943 if self.issuer_hash != other.subject_hash: 944 return False 945 return other.pubKey.verifyCert(self) 946 947 def verify(self, anchors): 948 # Return True iff the CRL is signed by one of the provided anchors. 949 return any(self.isIssuerCert(a) for a in anchors) 950 951 def show(self): 952 print("Version: %d" % self.version) 953 print("sigAlg: " + self.sigAlg) 954 print("Issuer: " + self.issuer_str) 955 print("lastUpdate: %s" % self.lastUpdate_str) 956 print("nextUpdate: %s" % self.nextUpdate_str) 957 958 959###################### 960# Certificate chains # 961###################### 962 963class Chain(list): 964 """ 965 Basically, an enhanced array of Cert. 966 """ 967 968 def __init__(self, certList, cert0=None): 969 """ 970 Construct a chain of certificates starting with a self-signed 971 certificate (or any certificate submitted by the user) 972 and following issuer/subject matching and signature validity. 973 If there is exactly one chain to be constructed, it will be, 974 but if there are multiple potential chains, there is no guarantee 975 that the retained one will be the longest one. 976 As Cert and CRL classes both share an isIssuerCert() method, 977 the trailing element of a Chain may alternatively be a CRL. 978 979 Note that we do not check AKID/{SKID/issuer/serial} matching, 980 nor the presence of keyCertSign in keyUsage extension (if present). 981 """ 982 list.__init__(self, ()) 983 if cert0: 984 self.append(cert0) 985 else: 986 for root_candidate in certList: 987 if root_candidate.isSelfSigned(): 988 self.append(root_candidate) 989 certList.remove(root_candidate) 990 break 991 992 if len(self) > 0: 993 while certList: 994 tmp_len = len(self) 995 for c in certList: 996 if c.isIssuerCert(self[-1]): 997 self.append(c) 998 certList.remove(c) 999 break 1000 if len(self) == tmp_len: 1001 # no new certificate appended to self 1002 break 1003 1004 def verifyChain(self, anchors, untrusted=None): 1005 """ 1006 Perform verification of certificate chains for that certificate. 1007 A list of anchors is required. The certificates in the optional 1008 untrusted list may be used as additional elements to the final chain. 1009 On par with chain instantiation, only one chain constructed with the 1010 untrusted candidates will be retained. Eventually, dates are checked. 1011 """ 1012 untrusted = untrusted or [] 1013 for a in anchors: 1014 chain = Chain(self + untrusted, a) 1015 if len(chain) == 1: # anchor only 1016 continue 1017 # check that the chain does not exclusively rely on untrusted 1018 if any(c in chain[1:] for c in self): 1019 for c in chain: 1020 if c.remainingDays() < 0: 1021 break 1022 if c is chain[-1]: # we got to the end of the chain 1023 return chain 1024 return None 1025 1026 def verifyChainFromCAFile(self, cafile, untrusted_file=None): 1027 """ 1028 Does the same job as .verifyChain() but using the list of anchors 1029 from the cafile. As for .verifyChain(), a list of untrusted 1030 certificates can be passed (as a file, this time). 1031 """ 1032 try: 1033 with open(cafile, "rb") as f: 1034 ca_certs = f.read() 1035 except Exception: 1036 raise Exception("Could not read from cafile") 1037 1038 anchors = [Cert(c) for c in split_pem(ca_certs)] 1039 1040 untrusted = None 1041 if untrusted_file: 1042 try: 1043 with open(untrusted_file, "rb") as f: 1044 untrusted_certs = f.read() 1045 except Exception: 1046 raise Exception("Could not read from untrusted_file") 1047 untrusted = [Cert(c) for c in split_pem(untrusted_certs)] 1048 1049 return self.verifyChain(anchors, untrusted) 1050 1051 def verifyChainFromCAPath(self, capath, untrusted_file=None): 1052 """ 1053 Does the same job as .verifyChainFromCAFile() but using the list 1054 of anchors in capath directory. The directory should (only) contain 1055 certificates files in PEM format. As for .verifyChainFromCAFile(), 1056 a list of untrusted certificates can be passed as a file 1057 (concatenation of the certificates in PEM format). 1058 """ 1059 try: 1060 anchors = [] 1061 for cafile in os.listdir(capath): 1062 with open(os.path.join(capath, cafile), "rb") as fd: 1063 anchors.append(Cert(fd.read())) 1064 except Exception: 1065 raise Exception("capath provided is not a valid cert path") 1066 1067 untrusted = None 1068 if untrusted_file: 1069 try: 1070 with open(untrusted_file, "rb") as f: 1071 untrusted_certs = f.read() 1072 except Exception: 1073 raise Exception("Could not read from untrusted_file") 1074 untrusted = [Cert(c) for c in split_pem(untrusted_certs)] 1075 1076 return self.verifyChain(anchors, untrusted) 1077 1078 def __repr__(self): 1079 llen = len(self) - 1 1080 if llen < 0: 1081 return "" 1082 c = self[0] 1083 s = "__ " 1084 if not c.isSelfSigned(): 1085 s += "%s [Not Self Signed]\n" % c.subject_str 1086 else: 1087 s += "%s [Self Signed]\n" % c.subject_str 1088 idx = 1 1089 while idx <= llen: 1090 c = self[idx] 1091 s += "%s_ %s" % (" " * idx * 2, c.subject_str) 1092 if idx != llen: 1093 s += "\n" 1094 idx += 1 1095 return s 1096