1## This file is part of Scapy 2## Copyright (C) 2008 Arnaud Ebalard <arnaud.ebalard@eads.net> 3## <arno@natisbad.org> 4## 2015, 2016, 2017 Maxence Tury <maxence.tury@ssi.gouv.fr> 5## This program is published under a GPLv2 license 6 7""" 8High-level methods for PKI objects (X.509 certificates, CRLs, asymmetric keys). 9Supports both RSA and ECDSA objects. 10 11The classes below are wrappers for the ASN.1 objects defined in x509.py. 12By collecting their attributes, we bypass the ASN.1 structure, hence 13there is no direct method for exporting a new full DER-encoded version 14of a Cert instance after its serial has been modified (for example). 15If you need to modify an import, just use the corresponding ASN1_Packet. 16 17For instance, here is what you could do in order to modify the serial of 18'cert' and then resign it with whatever 'key': 19 f = open('cert.der') 20 c = X509_Cert(f.read()) 21 c.tbsCertificate.serialNumber = 0x4B1D 22 k = PrivKey('key.pem') 23 new_x509_cert = k.resignCert(c) 24No need for obnoxious openssl tweaking anymore. :) 25""" 26 27from __future__ import absolute_import 28from __future__ import print_function 29import base64 30import os 31import time 32 33from scapy.config import conf, crypto_validator 34import scapy.modules.six as six 35from scapy.modules.six.moves import range 36if conf.crypto_valid: 37 from cryptography.hazmat.backends import default_backend 38 from cryptography.hazmat.primitives import serialization 39 from cryptography.hazmat.primitives.asymmetric import rsa 40 41from scapy.error import warning 42from scapy.utils import binrepr 43from scapy.asn1.asn1 import ASN1_BIT_STRING 44from scapy.asn1.mib import hash_by_oid 45from scapy.layers.x509 import (X509_SubjectPublicKeyInfo, 46 RSAPublicKey, RSAPrivateKey, 47 ECDSAPublicKey, ECDSAPrivateKey, 48 RSAPrivateKey_OpenSSL, ECDSAPrivateKey_OpenSSL, 49 X509_Cert, X509_CRL) 50from scapy.layers.tls.crypto.pkcs1 import (pkcs_os2ip, pkcs_i2osp, _get_hash, 51 _EncryptAndVerifyRSA, 52 _DecryptAndSignRSA) 53 54from scapy.compat import * 55 56# Maximum allowed size in bytes for a certificate file, to avoid 57# loading huge file when importing a cert 58_MAX_KEY_SIZE = 50*1024 59_MAX_CERT_SIZE = 50*1024 60_MAX_CRL_SIZE = 10*1024*1024 # some are that big 61 62 63##################################################################### 64# Some helpers 65##################################################################### 66 67@conf.commands.register 68def der2pem(der_string, obj="UNKNOWN"): 69 """Convert DER octet string to PEM format (with optional header)""" 70 # Encode a byte string in PEM format. Header advertizes <obj> type. 71 pem_string = ("-----BEGIN %s-----\n" % obj).encode() 72 base64_string = base64.b64encode(der_string) 73 chunks = [base64_string[i:i+64] for i in range(0, len(base64_string), 64)] 74 pem_string += b'\n'.join(chunks) 75 pem_string += ("\n-----END %s-----\n" % obj).encode() 76 return pem_string 77 78@conf.commands.register 79def pem2der(pem_string): 80 """Convert PEM string to DER format""" 81 # Encode all lines between the first '-----\n' and the 2nd-to-last '-----'. 82 pem_string = pem_string.replace(b"\r", b"") 83 first_idx = pem_string.find(b"-----\n") + 6 84 if pem_string.find(b"-----BEGIN", first_idx) != -1: 85 raise Exception("pem2der() expects only one PEM-encoded object") 86 last_idx = pem_string.rfind(b"-----", 0, pem_string.rfind(b"-----")) 87 base64_string = pem_string[first_idx:last_idx] 88 base64_string.replace(b"\n", b"") 89 der_string = base64.b64decode(base64_string) 90 return der_string 91 92def split_pem(s): 93 """ 94 Split PEM objects. Useful to process concatenated certificates. 95 """ 96 pem_strings = [] 97 while s != b"": 98 start_idx = s.find(b"-----BEGIN") 99 if start_idx == -1: 100 break 101 end_idx = s.find(b"-----END") 102 end_idx = s.find(b"\n", end_idx) + 1 103 pem_strings.append(s[start_idx:end_idx]) 104 s = s[end_idx:] 105 return pem_strings 106 107 108class _PKIObj(object): 109 def __init__(self, frmt, der, pem): 110 # Note that changing attributes of the _PKIObj does not update these 111 # values (e.g. modifying k.modulus does not change k.der). 112 #XXX use __setattr__ for this 113 self.frmt = frmt 114 self.der = der 115 self.pem = pem 116 117 def __str__(self): 118 return self.der 119 120 121class _PKIObjMaker(type): 122 def __call__(cls, obj_path, obj_max_size, pem_marker=None): 123 # This enables transparent DER and PEM-encoded data imports. 124 # Note that when importing a PEM file with multiple objects (like ECDSA 125 # private keys output by openssl), it will concatenate every object in 126 # order to create a 'der' attribute. When converting a 'multi' DER file 127 # into a PEM file, though, the PEM attribute will not be valid, 128 # because we do not try to identify the class of each object. 129 error_msg = "Unable to import data" 130 131 if obj_path is None: 132 raise Exception(error_msg) 133 obj_path = raw(obj_path) 134 135 if (not b'\x00' in obj_path) and os.path.isfile(obj_path): 136 _size = os.path.getsize(obj_path) 137 if _size > obj_max_size: 138 raise Exception(error_msg) 139 try: 140 f = open(obj_path, "rb") 141 _raw = f.read() 142 f.close() 143 except: 144 raise Exception(error_msg) 145 else: 146 _raw = obj_path 147 148 try: 149 if b"-----BEGIN" in _raw: 150 frmt = "PEM" 151 pem = _raw 152 der_list = split_pem(_raw) 153 der = b''.join(map(pem2der, der_list)) 154 else: 155 frmt = "DER" 156 der = _raw 157 pem = "" 158 if pem_marker is not None: 159 pem = der2pem(_raw, pem_marker) 160 # type identification may be needed for pem_marker 161 # in such case, the pem attribute has to be updated 162 except: 163 raise Exception(error_msg) 164 165 p = _PKIObj(frmt, der, pem) 166 return p 167 168 169##################################################################### 170# PKI objects wrappers 171##################################################################### 172 173############### 174# Public Keys # 175############### 176 177class _PubKeyFactory(_PKIObjMaker): 178 """ 179 Metaclass for PubKey creation. 180 It casts the appropriate class on the fly, then fills in 181 the appropriate attributes with import_from_asn1pkt() submethod. 182 """ 183 def __call__(cls, key_path=None): 184 185 if key_path is None: 186 obj = type.__call__(cls) 187 if cls is PubKey: 188 cls = PubKeyRSA 189 obj.__class__ = cls 190 obj.frmt = "original" 191 obj.fill_and_store() 192 return obj 193 194 # This deals with the rare RSA 'kx export' call. 195 if isinstance(key_path, tuple): 196 obj = type.__call__(cls) 197 obj.__class__ = PubKeyRSA 198 obj.frmt = "tuple" 199 obj.import_from_tuple(key_path) 200 return obj 201 202 # Now for the usual calls, key_path may be the path to either: 203 # _an X509_SubjectPublicKeyInfo, as processed by openssl; 204 # _an RSAPublicKey; 205 # _an ECDSAPublicKey. 206 obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE) 207 try: 208 spki = X509_SubjectPublicKeyInfo(obj.der) 209 pubkey = spki.subjectPublicKey 210 if isinstance(pubkey, RSAPublicKey): 211 obj.__class__ = PubKeyRSA 212 obj.import_from_asn1pkt(pubkey) 213 elif isinstance(pubkey, ECDSAPublicKey): 214 obj.__class__ = PubKeyECDSA 215 try: 216 obj.import_from_der(obj.der) 217 except ImportError: 218 pass 219 else: 220 raise 221 marker = b"PUBLIC KEY" 222 except: 223 try: 224 pubkey = RSAPublicKey(obj.der) 225 obj.__class__ = PubKeyRSA 226 obj.import_from_asn1pkt(pubkey) 227 marker = b"RSA PUBLIC KEY" 228 except: 229 # We cannot import an ECDSA public key without curve knowledge 230 raise Exception("Unable to import public key") 231 232 if obj.frmt == "DER": 233 obj.pem = der2pem(obj.der, marker) 234 return obj 235 236 237class PubKey(six.with_metaclass(_PubKeyFactory, object)): 238 """ 239 Parent class for both PubKeyRSA and PubKeyECDSA. 240 Provides a common verifyCert() method. 241 """ 242 243 def verifyCert(self, cert): 244 """ Verifies either a Cert or an X509_Cert. """ 245 tbsCert = cert.tbsCertificate 246 sigAlg = tbsCert.signature 247 h = hash_by_oid[sigAlg.algorithm.val] 248 sigVal = raw(cert.signatureValue) 249 return self.verify(raw(tbsCert), sigVal, h=h, t='pkcs') 250 251 252class PubKeyRSA(PubKey, _EncryptAndVerifyRSA): 253 """ 254 Wrapper for RSA keys based on _EncryptAndVerifyRSA from crypto/pkcs1.py 255 Use the 'key' attribute to access original object. 256 """ 257 @crypto_validator 258 def fill_and_store(self, modulus=None, modulusLen=None, pubExp=None): 259 pubExp = pubExp or 65537 260 if not modulus: 261 real_modulusLen = modulusLen or 2048 262 private_key = rsa.generate_private_key(public_exponent=pubExp, 263 key_size=real_modulusLen, 264 backend=default_backend()) 265 self.pubkey = private_key.public_key() 266 else: 267 real_modulusLen = len(binrepr(modulus)) 268 if modulusLen and real_modulusLen != modulusLen: 269 warning("modulus and modulusLen do not match!") 270 pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp) 271 self.pubkey = pubNum.public_key(default_backend()) 272 # Lines below are only useful for the legacy part of pkcs1.py 273 pubNum = self.pubkey.public_numbers() 274 self._modulusLen = real_modulusLen 275 self._modulus = pubNum.n 276 self._pubExp = pubNum.e 277 278 @crypto_validator 279 def import_from_tuple(self, tup): 280 # this is rarely used 281 e, m, mLen = tup 282 if isinstance(m, bytes): 283 m = pkcs_os2ip(m) 284 if isinstance(e, bytes): 285 e = pkcs_os2ip(e) 286 self.fill_and_store(modulus=m, pubExp=e) 287 self.pem = self.pubkey.public_bytes( 288 encoding=serialization.Encoding.PEM, 289 format=serialization.PublicFormat.SubjectPublicKeyInfo) 290 self.der = pem2der(self.pem) 291 292 def import_from_asn1pkt(self, pubkey): 293 modulus = pubkey.modulus.val 294 pubExp = pubkey.publicExponent.val 295 self.fill_and_store(modulus=modulus, pubExp=pubExp) 296 297 def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None): 298 # no ECDSA encryption support, hence no ECDSA specific keywords here 299 return _EncryptAndVerifyRSA.encrypt(self, msg, t, h, mgf, L) 300 301 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): 302 return _EncryptAndVerifyRSA.verify(self, msg, sig, t, h, mgf, L) 303 304class PubKeyECDSA(PubKey): 305 """ 306 Wrapper for ECDSA keys based on the cryptography library. 307 Use the 'key' attribute to access original object. 308 """ 309 @crypto_validator 310 def fill_and_store(self, curve=None): 311 curve = curve or ec.SECP256R1 312 private_key = ec.generate_private_key(curve(), default_backend()) 313 self.pubkey = private_key.public_key() 314 315 @crypto_validator 316 def import_from_der(self, pubkey): 317 # No lib support for explicit curves nor compressed points. 318 self.pubkey = serialization.load_der_public_key(pubkey, 319 backend=default_backend()) 320 321 def encrypt(self, msg, h="sha256", **kwargs): 322 # cryptography lib does not support ECDSA encryption 323 raise Exception("No ECDSA encryption support") 324 325 @crypto_validator 326 def verify(self, msg, sig, h="sha256", **kwargs): 327 # 'sig' should be a DER-encoded signature, as per RFC 3279 328 verifier = self.pubkey.verifier(sig, ec.ECDSA(_get_hash(h))) 329 verifier.update(msg) 330 return verifier.verify() 331 332 333################ 334# Private Keys # 335################ 336 337class _PrivKeyFactory(_PKIObjMaker): 338 """ 339 Metaclass for PrivKey creation. 340 It casts the appropriate class on the fly, then fills in 341 the appropriate attributes with import_from_asn1pkt() submethod. 342 """ 343 def __call__(cls, key_path=None): 344 """ 345 key_path may be the path to either: 346 _an RSAPrivateKey_OpenSSL (as generated by openssl); 347 _an ECDSAPrivateKey_OpenSSL (as generated by openssl); 348 _an RSAPrivateKey; 349 _an ECDSAPrivateKey. 350 """ 351 if key_path is None: 352 obj = type.__call__(cls) 353 if cls is PrivKey: 354 cls = PrivKeyECDSA 355 obj.__class__ = cls 356 obj.frmt = "original" 357 obj.fill_and_store() 358 return obj 359 360 obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE) 361 multiPEM = False 362 try: 363 privkey = RSAPrivateKey_OpenSSL(obj.der) 364 privkey = privkey.privateKey 365 obj.__class__ = PrivKeyRSA 366 marker = b"PRIVATE KEY" 367 except: 368 try: 369 privkey = ECDSAPrivateKey_OpenSSL(obj.der) 370 privkey = privkey.privateKey 371 obj.__class__ = PrivKeyECDSA 372 marker = b"EC PRIVATE KEY" 373 multiPEM = True 374 except: 375 try: 376 privkey = RSAPrivateKey(obj.der) 377 obj.__class__ = PrivKeyRSA 378 marker = b"RSA PRIVATE KEY" 379 except: 380 try: 381 privkey = ECDSAPrivateKey(obj.der) 382 obj.__class__ = PrivKeyECDSA 383 marker = b"EC PRIVATE KEY" 384 except: 385 raise Exception("Unable to import private key") 386 try: 387 obj.import_from_asn1pkt(privkey) 388 except ImportError: 389 pass 390 391 if obj.frmt == "DER": 392 if multiPEM: 393 # this does not restore the EC PARAMETERS header 394 obj.pem = der2pem(raw(privkey), marker) 395 else: 396 obj.pem = der2pem(obj.der, marker) 397 return obj 398 399 400class PrivKey(six.with_metaclass(_PrivKeyFactory, object)): 401 """ 402 Parent class for both PrivKeyRSA and PrivKeyECDSA. 403 Provides common signTBSCert() and resignCert() methods. 404 """ 405 406 def signTBSCert(self, tbsCert, h="sha256"): 407 """ 408 Note that this will always copy the signature field from the 409 tbsCertificate into the signatureAlgorithm field of the result, 410 regardless of the coherence between its contents (which might 411 indicate ecdsa-with-SHA512) and the result (e.g. RSA signing MD2). 412 413 There is a small inheritance trick for the computation of sigVal 414 below: in order to use a sign() method which would apply 415 to both PrivKeyRSA and PrivKeyECDSA, the sign() methods of the 416 subclasses accept any argument, be it from the RSA or ECDSA world, 417 and then they keep the ones they're interested in. 418 Here, t will be passed eventually to pkcs1._DecryptAndSignRSA.sign(). 419 """ 420 sigAlg = tbsCert.signature 421 h = h or hash_by_oid[sigAlg.algorithm.val] 422 sigVal = self.sign(raw(tbsCert), h=h, t='pkcs') 423 c = X509_Cert() 424 c.tbsCertificate = tbsCert 425 c.signatureAlgorithm = sigAlg 426 c.signatureValue = ASN1_BIT_STRING(sigVal, readable=True) 427 return c 428 429 def resignCert(self, cert): 430 """ Rewrite the signature of either a Cert or an X509_Cert. """ 431 return self.signTBSCert(cert.tbsCertificate) 432 433 def verifyCert(self, cert): 434 """ Verifies either a Cert or an X509_Cert. """ 435 tbsCert = cert.tbsCertificate 436 sigAlg = tbsCert.signature 437 h = hash_by_oid[sigAlg.algorithm.val] 438 sigVal = raw(cert.signatureValue) 439 return self.verify(raw(tbsCert), sigVal, h=h, t='pkcs') 440 441 442class PrivKeyRSA(PrivKey, _EncryptAndVerifyRSA, _DecryptAndSignRSA): 443 """ 444 Wrapper for RSA keys based on _DecryptAndSignRSA from crypto/pkcs1.py 445 Use the 'key' attribute to access original object. 446 """ 447 @crypto_validator 448 def fill_and_store(self, modulus=None, modulusLen=None, pubExp=None, 449 prime1=None, prime2=None, coefficient=None, 450 exponent1=None, exponent2=None, privExp=None): 451 pubExp = pubExp or 65537 452 if None in [modulus, prime1, prime2, coefficient, privExp, 453 exponent1, exponent2]: 454 # note that the library requires every parameter 455 # in order to call RSAPrivateNumbers(...) 456 # if one of these is missing, we generate a whole new key 457 real_modulusLen = modulusLen or 2048 458 self.key = rsa.generate_private_key(public_exponent=pubExp, 459 key_size=real_modulusLen, 460 backend=default_backend()) 461 self.pubkey = self.key.public_key() 462 else: 463 real_modulusLen = len(binrepr(modulus)) 464 if modulusLen and real_modulusLen != modulusLen: 465 warning("modulus and modulusLen do not match!") 466 pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp) 467 privNum = rsa.RSAPrivateNumbers(p=prime1, q=prime2, 468 dmp1=exponent1, dmq1=exponent2, 469 iqmp=coefficient, d=privExp, 470 public_numbers=pubNum) 471 self.key = privNum.private_key(default_backend()) 472 self.pubkey = self.key.public_key() 473 474 # Lines below are only useful for the legacy part of pkcs1.py 475 pubNum = self.pubkey.public_numbers() 476 self._modulusLen = real_modulusLen 477 self._modulus = pubNum.n 478 self._pubExp = pubNum.e 479 480 def import_from_asn1pkt(self, privkey): 481 modulus = privkey.modulus.val 482 pubExp = privkey.publicExponent.val 483 privExp = privkey.privateExponent.val 484 prime1 = privkey.prime1.val 485 prime2 = privkey.prime2.val 486 exponent1 = privkey.exponent1.val 487 exponent2 = privkey.exponent2.val 488 coefficient = privkey.coefficient.val 489 self.fill_and_store(modulus=modulus, pubExp=pubExp, 490 privExp=privExp, prime1=prime1, prime2=prime2, 491 exponent1=exponent1, exponent2=exponent2, 492 coefficient=coefficient) 493 494 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): 495 # Let's copy this from PubKeyRSA instead of adding another baseclass :) 496 return _EncryptAndVerifyRSA.verify(self, msg, sig, t, h, mgf, L) 497 498 def sign(self, data, t="pkcs", h="sha256", mgf=None, L=None): 499 return _DecryptAndSignRSA.sign(self, data, t, h, mgf, L) 500 501 502class PrivKeyECDSA(PrivKey): 503 """ 504 Wrapper for ECDSA keys based on SigningKey from ecdsa library. 505 Use the 'key' attribute to access original object. 506 """ 507 @crypto_validator 508 def fill_and_store(self, curve=None): 509 curve = curve or ec.SECP256R1 510 self.key = ec.generate_private_key(curve(), default_backend()) 511 self.pubkey = self.key.public_key() 512 513 @crypto_validator 514 def import_from_asn1pkt(self, privkey): 515 self.key = serialization.load_der_private_key(raw(privkey), None, 516 backend=default_backend()) 517 self.pubkey = self.key.public_key() 518 519 @crypto_validator 520 def verify(self, msg, sig, h="sha256", **kwargs): 521 # 'sig' should be a DER-encoded signature, as per RFC 3279 522 verifier = self.pubkey.verifier(sig, ec.ECDSA(_get_hash(h))) 523 verifier.update(msg) 524 return verifier.verify() 525 526 @crypto_validator 527 def sign(self, data, h="sha256", **kwargs): 528 signer = self.key.signer(ec.ECDSA(_get_hash(h))) 529 signer.update(data) 530 return signer.finalize() 531 532 533################ 534# Certificates # 535################ 536 537class _CertMaker(_PKIObjMaker): 538 """ 539 Metaclass for Cert creation. It is not necessary as it was for the keys, 540 but we reuse the model instead of creating redundant constructors. 541 """ 542 def __call__(cls, cert_path): 543 obj = _PKIObjMaker.__call__(cls, cert_path, 544 _MAX_CERT_SIZE, "CERTIFICATE") 545 obj.__class__ = Cert 546 try: 547 cert = X509_Cert(obj.der) 548 except: 549 raise Exception("Unable to import certificate") 550 obj.import_from_asn1pkt(cert) 551 return obj 552 553 554class Cert(six.with_metaclass(_CertMaker, object)): 555 """ 556 Wrapper for the X509_Cert from layers/x509.py. 557 Use the 'x509Cert' attribute to access original object. 558 """ 559 560 def import_from_asn1pkt(self, cert): 561 error_msg = "Unable to import certificate" 562 563 self.x509Cert = cert 564 565 tbsCert = cert.tbsCertificate 566 self.tbsCertificate = tbsCert 567 568 if tbsCert.version: 569 self.version = tbsCert.version.val + 1 570 else: 571 self.version = 1 572 self.serial = tbsCert.serialNumber.val 573 self.sigAlg = tbsCert.signature.algorithm.oidname 574 self.issuer = tbsCert.get_issuer() 575 self.issuer_str = tbsCert.get_issuer_str() 576 self.issuer_hash = hash(self.issuer_str) 577 self.subject = tbsCert.get_subject() 578 self.subject_str = tbsCert.get_subject_str() 579 self.subject_hash = hash(self.subject_str) 580 581 self.notBefore_str = tbsCert.validity.not_before.pretty_time 582 notBefore = tbsCert.validity.not_before.val 583 if notBefore[-1] == "Z": 584 notBefore = notBefore[:-1] 585 try: 586 self.notBefore = time.strptime(notBefore, "%y%m%d%H%M%S") 587 except: 588 raise Exception(error_msg) 589 self.notBefore_str_simple = time.strftime("%x", self.notBefore) 590 591 self.notAfter_str = tbsCert.validity.not_after.pretty_time 592 notAfter = tbsCert.validity.not_after.val 593 if notAfter[-1] == "Z": 594 notAfter = notAfter[:-1] 595 try: 596 self.notAfter = time.strptime(notAfter, "%y%m%d%H%M%S") 597 except: 598 raise Exception(error_msg) 599 self.notAfter_str_simple = time.strftime("%x", self.notAfter) 600 601 self.pubKey = PubKey(raw(tbsCert.subjectPublicKeyInfo)) 602 603 if tbsCert.extensions: 604 for extn in tbsCert.extensions: 605 if extn.extnID.oidname == "basicConstraints": 606 self.cA = False 607 if extn.extnValue.cA: 608 self.cA = not (extn.extnValue.cA.val == 0) 609 elif extn.extnID.oidname == "keyUsage": 610 self.keyUsage = extn.extnValue.get_keyUsage() 611 elif extn.extnID.oidname == "extKeyUsage": 612 self.extKeyUsage = extn.extnValue.get_extendedKeyUsage() 613 elif extn.extnID.oidname == "authorityKeyIdentifier": 614 self.authorityKeyID = extn.extnValue.keyIdentifier.val 615 616 self.signatureValue = raw(cert.signatureValue) 617 self.signatureLen = len(self.signatureValue) 618 619 def isIssuerCert(self, other): 620 """ 621 True if 'other' issued 'self', i.e.: 622 - self.issuer == other.subject 623 - self is signed by other 624 """ 625 if self.issuer_hash != other.subject_hash: 626 return False 627 return other.pubKey.verifyCert(self) 628 629 def isSelfSigned(self): 630 """ 631 Return True if the certificate is self-signed: 632 - issuer and subject are the same 633 - the signature of the certificate is valid. 634 """ 635 if self.issuer_hash == self.subject_hash: 636 return self.isIssuerCert(self) 637 return False 638 639 def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None): 640 # no ECDSA *encryption* support, hence only RSA specific keywords here 641 return self.pubKey.encrypt(msg, t, h, mgf, L) 642 643 def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None): 644 return self.pubKey.verify(msg, sig, t, h, mgf, L) 645 646 def remainingDays(self, now=None): 647 """ 648 Based on the value of notAfter field, returns the number of 649 days the certificate will still be valid. The date used for the 650 comparison is the current and local date, as returned by 651 time.localtime(), except if 'now' argument is provided another 652 one. 'now' argument can be given as either a time tuple or a string 653 representing the date. Accepted format for the string version 654 are: 655 656 - '%b %d %H:%M:%S %Y %Z' e.g. 'Jan 30 07:38:59 2008 GMT' 657 - '%m/%d/%y' e.g. '01/30/08' (less precise) 658 659 If the certificate is no more valid at the date considered, then 660 a negative value is returned representing the number of days 661 since it has expired. 662 663 The number of days is returned as a float to deal with the unlikely 664 case of certificates that are still just valid. 665 """ 666 if now is None: 667 now = time.localtime() 668 elif isinstance(now, str): 669 try: 670 if '/' in now: 671 now = time.strptime(now, '%m/%d/%y') 672 else: 673 now = time.strptime(now, '%b %d %H:%M:%S %Y %Z') 674 except: 675 warning("Bad time string provided, will use localtime() instead.") 676 now = time.localtime() 677 678 now = time.mktime(now) 679 nft = time.mktime(self.notAfter) 680 diff = (nft - now)/(24.*3600) 681 return diff 682 683 def isRevoked(self, crl_list): 684 """ 685 Given a list of trusted CRL (their signature has already been 686 verified with trusted anchors), this function returns True if 687 the certificate is marked as revoked by one of those CRL. 688 689 Note that if the Certificate was on hold in a previous CRL and 690 is now valid again in a new CRL and bot are in the list, it 691 will be considered revoked: this is because _all_ CRLs are 692 checked (not only the freshest) and revocation status is not 693 handled. 694 695 Also note that the check on the issuer is performed on the 696 Authority Key Identifier if available in _both_ the CRL and the 697 Cert. Otherwise, the issuers are simply compared. 698 """ 699 for c in crl_list: 700 if (self.authorityKeyID is not None and 701 c.authorityKeyID is not None and 702 self.authorityKeyID == c.authorityKeyID): 703 return self.serial in (x[0] for x in c.revoked_cert_serials) 704 elif self.issuer == c.issuer: 705 return self.serial in (x[0] for x in c.revoked_cert_serials) 706 return False 707 708 def export(self, filename, fmt="DER"): 709 """ 710 Export certificate in 'fmt' format (DER or PEM) to file 'filename' 711 """ 712 f = open(filename, "wb") 713 if fmt == "DER": 714 f.write(self.der) 715 elif fmt == "PEM": 716 f.write(self.pem) 717 f.close() 718 719 def show(self): 720 print("Serial: %s" % self.serial) 721 print("Issuer: " + self.issuer_str) 722 print("Subject: " + self.subject_str) 723 print("Validity: %s to %s" % (self.notBefore_str, self.notAfter_str)) 724 725 def __repr__(self): 726 return "[X.509 Cert. Subject:%s, Issuer:%s]" % (self.subject_str, self.issuer_str) 727 728 729################################ 730# Certificate Revocation Lists # 731################################ 732 733class _CRLMaker(_PKIObjMaker): 734 """ 735 Metaclass for CRL creation. It is not necessary as it was for the keys, 736 but we reuse the model instead of creating redundant constructors. 737 """ 738 def __call__(cls, cert_path): 739 obj = _PKIObjMaker.__call__(cls, cert_path, _MAX_CRL_SIZE, "X509 CRL") 740 obj.__class__ = CRL 741 try: 742 crl = X509_CRL(obj.der) 743 except: 744 raise Exception("Unable to import CRL") 745 obj.import_from_asn1pkt(crl) 746 return obj 747 748 749class CRL(six.with_metaclass(_CRLMaker, object)): 750 """ 751 Wrapper for the X509_CRL from layers/x509.py. 752 Use the 'x509CRL' attribute to access original object. 753 """ 754 755 def import_from_asn1pkt(self, crl): 756 error_msg = "Unable to import CRL" 757 758 self.x509CRL = crl 759 760 tbsCertList = crl.tbsCertList 761 self.tbsCertList = raw(tbsCertList) 762 763 if tbsCertList.version: 764 self.version = tbsCertList.version.val + 1 765 else: 766 self.version = 1 767 self.sigAlg = tbsCertList.signature.algorithm.oidname 768 self.issuer = tbsCertList.get_issuer() 769 self.issuer_str = tbsCertList.get_issuer_str() 770 self.issuer_hash = hash(self.issuer_str) 771 772 self.lastUpdate_str = tbsCertList.this_update.pretty_time 773 lastUpdate = tbsCertList.this_update.val 774 if lastUpdate[-1] == "Z": 775 lastUpdate = lastUpdate[:-1] 776 try: 777 self.lastUpdate = time.strptime(lastUpdate, "%y%m%d%H%M%S") 778 except: 779 raise Exception(error_msg) 780 self.lastUpdate_str_simple = time.strftime("%x", self.lastUpdate) 781 782 self.nextUpdate = None 783 self.nextUpdate_str_simple = None 784 if tbsCertList.next_update: 785 self.nextUpdate_str = tbsCertList.next_update.pretty_time 786 nextUpdate = tbsCertList.next_update.val 787 if nextUpdate[-1] == "Z": 788 nextUpdate = nextUpdate[:-1] 789 try: 790 self.nextUpdate = time.strptime(nextUpdate, "%y%m%d%H%M%S") 791 except: 792 raise Exception(error_msg) 793 self.nextUpdate_str_simple = time.strftime("%x", self.nextUpdate) 794 795 if tbsCertList.crlExtensions: 796 for extension in tbsCertList.crlExtensions: 797 if extension.extnID.oidname == "cRLNumber": 798 self.number = extension.extnValue.cRLNumber.val 799 800 revoked = [] 801 if tbsCertList.revokedCertificates: 802 for cert in tbsCertList.revokedCertificates: 803 serial = cert.serialNumber.val 804 date = cert.revocationDate.val 805 if date[-1] == "Z": 806 date = date[:-1] 807 try: 808 revocationDate = time.strptime(date, "%y%m%d%H%M%S") 809 except: 810 raise Exception(error_msg) 811 revoked.append((serial, date)) 812 self.revoked_cert_serials = revoked 813 814 self.signatureValue = raw(crl.signatureValue) 815 self.signatureLen = len(self.signatureValue) 816 817 def isIssuerCert(self, other): 818 # This is exactly the same thing as in Cert method. 819 if self.issuer_hash != other.subject_hash: 820 return False 821 return other.pubKey.verifyCert(self) 822 823 def verify(self, anchors): 824 # Return True iff the CRL is signed by one of the provided anchors. 825 for a in anchors: 826 if self.isIssuerCert(a): 827 return True 828 return False 829 830 def show(self): 831 print("Version: %d" % self.version) 832 print("sigAlg: " + self.sigAlg) 833 print("Issuer: " + self.issuer_str) 834 print("lastUpdate: %s" % self.lastUpdate_str) 835 print("nextUpdate: %s" % self.nextUpdate_str) 836 837 838###################### 839# Certificate chains # 840###################### 841 842class Chain(list): 843 """ 844 Basically, an enhanced array of Cert. 845 """ 846 def __init__(self, certList, cert0=None): 847 """ 848 Construct a chain of certificates starting with a self-signed 849 certificate (or any certificate submitted by the user) 850 and following issuer/subject matching and signature validity. 851 If there is exactly one chain to be constructed, it will be, 852 but if there are multiple potential chains, there is no guarantee 853 that the retained one will be the longest one. 854 As Cert and CRL classes both share an isIssuerCert() method, 855 the trailing element of a Chain may alternatively be a CRL. 856 857 Note that we do not check AKID/{SKID/issuer/serial} matching, 858 nor the presence of keyCertSign in keyUsage extension (if present). 859 """ 860 list.__init__(self, ()) 861 if cert0: 862 self.append(cert0) 863 else: 864 for root_candidate in certList: 865 if root_candidate.isSelfSigned(): 866 self.append(root_candidate) 867 certList.remove(root_candidate) 868 break 869 870 if len(self) > 0: 871 while certList: 872 l = len(self) 873 for c in certList: 874 if c.isIssuerCert(self[-1]): 875 self.append(c) 876 certList.remove(c) 877 break 878 if len(self) == l: 879 # no new certificate appended to self 880 break 881 882 def verifyChain(self, anchors, untrusted=None): 883 """ 884 Perform verification of certificate chains for that certificate. 885 A list of anchors is required. The certificates in the optional 886 untrusted list may be used as additional elements to the final chain. 887 On par with chain instantiation, only one chain constructed with the 888 untrusted candidates will be retained. Eventually, dates are checked. 889 """ 890 untrusted = untrusted or [] 891 for a in anchors: 892 chain = Chain(self + untrusted, a) 893 if len(chain) == 1: # anchor only 894 continue 895 # check that the chain does not exclusively rely on untrusted 896 found = False 897 for c in self: 898 if c in chain[1:]: 899 found = True 900 if found: 901 for c in chain: 902 if c.remainingDays() < 0: 903 break 904 if c is chain[-1]: # we got to the end of the chain 905 return chain 906 return None 907 908 def verifyChainFromCAFile(self, cafile, untrusted_file=None): 909 """ 910 Does the same job as .verifyChain() but using the list of anchors 911 from the cafile. As for .verifyChain(), a list of untrusted 912 certificates can be passed (as a file, this time). 913 """ 914 try: 915 f = open(cafile) 916 ca_certs = f.read() 917 f.close() 918 except: 919 raise Exception("Could not read from cafile") 920 921 anchors = [Cert(c) for c in split_pem(ca_certs)] 922 923 untrusted = None 924 if untrusted_file: 925 try: 926 f = open(untrusted_file) 927 untrusted_certs = f.read() 928 f.close() 929 except: 930 raise Exception("Could not read from untrusted_file") 931 untrusted = [Cert(c) for c in split_pem(untrusted_certs)] 932 933 return self.verifyChain(anchors, untrusted) 934 935 def verifyChainFromCAPath(self, capath, untrusted_file=None): 936 """ 937 Does the same job as .verifyChainFromCAFile() but using the list 938 of anchors in capath directory. The directory should (only) contain 939 certificates files in PEM format. As for .verifyChainFromCAFile(), 940 a list of untrusted certificates can be passed as a file 941 (concatenation of the certificates in PEM format). 942 """ 943 try: 944 anchors = [] 945 for cafile in os.listdir(capath): 946 anchors.append(Cert(open(cafile).read())) 947 except: 948 raise Exception("capath provided is not a valid cert path") 949 950 untrusted = None 951 if untrusted_file: 952 try: 953 f = open(untrusted_file) 954 untrusted_certs = f.read() 955 f.close() 956 except: 957 raise Exception("Could not read from untrusted_file") 958 untrusted = [Cert(c) for c in split_pem(untrusted_certs)] 959 960 return self.verifyChain(anchors, untrusted) 961 962 def __repr__(self): 963 llen = len(self) - 1 964 if llen < 0: 965 return "" 966 c = self[0] 967 s = "__ " 968 if not c.isSelfSigned(): 969 s += "%s [Not Self Signed]\n" % c.subject_str 970 else: 971 s += "%s [Self Signed]\n" % c.subject_str 972 idx = 1 973 while idx <= llen: 974 c = self[idx] 975 s += "%s\_ %s" % (" "*idx*2, c.subject_str) 976 if idx != llen: 977 s += "\n" 978 idx += 1 979 return s 980 981 982############################## 983# Certificate export helpers # 984############################## 985 986def _create_ca_file(anchor_list, filename): 987 """ 988 Concatenate all the certificates (PEM format for the export) in 989 'anchor_list' and write the result to file 'filename'. On success 990 'filename' is returned, None otherwise. 991 992 If you are used to OpenSSL tools, this function builds a CAfile 993 that can be used for certificate and CRL check. 994 """ 995 try: 996 f = open(filename, "w") 997 for a in anchor_list: 998 s = a.output(fmt="PEM") 999 f.write(s) 1000 f.close() 1001 except IOError: 1002 return None 1003 return filename 1004 1005