• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1## This file is part of Scapy
2## Copyright (C) 2008 Arnaud Ebalard <arnaud.ebalard@eads.net>
3##                                   <arno@natisbad.org>
4##   2015, 2016, 2017 Maxence Tury   <maxence.tury@ssi.gouv.fr>
5## This program is published under a GPLv2 license
6
7"""
8High-level methods for PKI objects (X.509 certificates, CRLs, asymmetric keys).
9Supports both RSA and ECDSA objects.
10
11The classes below are wrappers for the ASN.1 objects defined in x509.py.
12By collecting their attributes, we bypass the ASN.1 structure, hence
13there is no direct method for exporting a new full DER-encoded version
14of a Cert instance after its serial has been modified (for example).
15If you need to modify an import, just use the corresponding ASN1_Packet.
16
17For instance, here is what you could do in order to modify the serial of
18'cert' and then resign it with whatever 'key':
19    f = open('cert.der')
20    c = X509_Cert(f.read())
21    c.tbsCertificate.serialNumber = 0x4B1D
22    k = PrivKey('key.pem')
23    new_x509_cert = k.resignCert(c)
24No need for obnoxious openssl tweaking anymore. :)
25"""
26
27from __future__ import absolute_import
28from __future__ import print_function
29import base64
30import os
31import time
32
33from scapy.config import conf, crypto_validator
34import scapy.modules.six as six
35from scapy.modules.six.moves import range
36if conf.crypto_valid:
37    from cryptography.hazmat.backends import default_backend
38    from cryptography.hazmat.primitives import serialization
39    from cryptography.hazmat.primitives.asymmetric import rsa
40
41from scapy.error import warning
42from scapy.utils import binrepr
43from scapy.asn1.asn1 import ASN1_BIT_STRING
44from scapy.asn1.mib import hash_by_oid
45from scapy.layers.x509 import (X509_SubjectPublicKeyInfo,
46                               RSAPublicKey, RSAPrivateKey,
47                               ECDSAPublicKey, ECDSAPrivateKey,
48                               RSAPrivateKey_OpenSSL, ECDSAPrivateKey_OpenSSL,
49                               X509_Cert, X509_CRL)
50from scapy.layers.tls.crypto.pkcs1 import (pkcs_os2ip, pkcs_i2osp, _get_hash,
51                                           _EncryptAndVerifyRSA,
52                                           _DecryptAndSignRSA)
53
54from scapy.compat import *
55
56# Maximum allowed size in bytes for a certificate file, to avoid
57# loading huge file when importing a cert
58_MAX_KEY_SIZE = 50*1024
59_MAX_CERT_SIZE = 50*1024
60_MAX_CRL_SIZE = 10*1024*1024   # some are that big
61
62
63#####################################################################
64# Some helpers
65#####################################################################
66
67@conf.commands.register
68def der2pem(der_string, obj="UNKNOWN"):
69    """Convert DER octet string to PEM format (with optional header)"""
70    # Encode a byte string in PEM format. Header advertizes <obj> type.
71    pem_string = ("-----BEGIN %s-----\n" % obj).encode()
72    base64_string = base64.b64encode(der_string)
73    chunks = [base64_string[i:i+64] for i in range(0, len(base64_string), 64)]
74    pem_string += b'\n'.join(chunks)
75    pem_string += ("\n-----END %s-----\n" % obj).encode()
76    return pem_string
77
78@conf.commands.register
79def pem2der(pem_string):
80    """Convert PEM string to DER format"""
81    # Encode all lines between the first '-----\n' and the 2nd-to-last '-----'.
82    pem_string = pem_string.replace(b"\r", b"")
83    first_idx = pem_string.find(b"-----\n") + 6
84    if pem_string.find(b"-----BEGIN", first_idx) != -1:
85        raise Exception("pem2der() expects only one PEM-encoded object")
86    last_idx = pem_string.rfind(b"-----", 0, pem_string.rfind(b"-----"))
87    base64_string = pem_string[first_idx:last_idx]
88    base64_string.replace(b"\n", b"")
89    der_string = base64.b64decode(base64_string)
90    return der_string
91
92def split_pem(s):
93    """
94    Split PEM objects. Useful to process concatenated certificates.
95    """
96    pem_strings = []
97    while s != b"":
98        start_idx = s.find(b"-----BEGIN")
99        if start_idx == -1:
100            break
101        end_idx = s.find(b"-----END")
102        end_idx = s.find(b"\n", end_idx) + 1
103        pem_strings.append(s[start_idx:end_idx])
104        s = s[end_idx:]
105    return pem_strings
106
107
108class _PKIObj(object):
109    def __init__(self, frmt, der, pem):
110        # Note that changing attributes of the _PKIObj does not update these
111        # values (e.g. modifying k.modulus does not change k.der).
112        #XXX use __setattr__ for this
113        self.frmt = frmt
114        self.der = der
115        self.pem = pem
116
117    def __str__(self):
118        return self.der
119
120
121class _PKIObjMaker(type):
122    def __call__(cls, obj_path, obj_max_size, pem_marker=None):
123        # This enables transparent DER and PEM-encoded data imports.
124        # Note that when importing a PEM file with multiple objects (like ECDSA
125        # private keys output by openssl), it will concatenate every object in
126        # order to create a 'der' attribute. When converting a 'multi' DER file
127        # into a PEM file, though, the PEM attribute will not be valid,
128        # because we do not try to identify the class of each object.
129        error_msg = "Unable to import data"
130
131        if obj_path is None:
132            raise Exception(error_msg)
133        obj_path = raw(obj_path)
134
135        if (not b'\x00' in obj_path) and os.path.isfile(obj_path):
136            _size = os.path.getsize(obj_path)
137            if _size > obj_max_size:
138                raise Exception(error_msg)
139            try:
140                f = open(obj_path, "rb")
141                _raw = f.read()
142                f.close()
143            except:
144                raise Exception(error_msg)
145        else:
146            _raw = obj_path
147
148        try:
149            if b"-----BEGIN" in _raw:
150                frmt = "PEM"
151                pem = _raw
152                der_list = split_pem(_raw)
153                der = b''.join(map(pem2der, der_list))
154            else:
155                frmt = "DER"
156                der = _raw
157                pem = ""
158                if pem_marker is not None:
159                    pem = der2pem(_raw, pem_marker)
160                # type identification may be needed for pem_marker
161                # in such case, the pem attribute has to be updated
162        except:
163            raise Exception(error_msg)
164
165        p = _PKIObj(frmt, der, pem)
166        return p
167
168
169#####################################################################
170# PKI objects wrappers
171#####################################################################
172
173###############
174# Public Keys #
175###############
176
177class _PubKeyFactory(_PKIObjMaker):
178    """
179    Metaclass for PubKey creation.
180    It casts the appropriate class on the fly, then fills in
181    the appropriate attributes with import_from_asn1pkt() submethod.
182    """
183    def __call__(cls, key_path=None):
184
185        if key_path is None:
186            obj = type.__call__(cls)
187            if cls is PubKey:
188                cls = PubKeyRSA
189            obj.__class__ = cls
190            obj.frmt = "original"
191            obj.fill_and_store()
192            return obj
193
194        # This deals with the rare RSA 'kx export' call.
195        if isinstance(key_path, tuple):
196            obj = type.__call__(cls)
197            obj.__class__ = PubKeyRSA
198            obj.frmt = "tuple"
199            obj.import_from_tuple(key_path)
200            return obj
201
202        # Now for the usual calls, key_path may be the path to either:
203        # _an X509_SubjectPublicKeyInfo, as processed by openssl;
204        # _an RSAPublicKey;
205        # _an ECDSAPublicKey.
206        obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE)
207        try:
208            spki = X509_SubjectPublicKeyInfo(obj.der)
209            pubkey = spki.subjectPublicKey
210            if isinstance(pubkey, RSAPublicKey):
211                obj.__class__ = PubKeyRSA
212                obj.import_from_asn1pkt(pubkey)
213            elif isinstance(pubkey, ECDSAPublicKey):
214                obj.__class__ = PubKeyECDSA
215                try:
216                    obj.import_from_der(obj.der)
217                except ImportError:
218                    pass
219            else:
220                raise
221            marker = b"PUBLIC KEY"
222        except:
223            try:
224                pubkey = RSAPublicKey(obj.der)
225                obj.__class__ = PubKeyRSA
226                obj.import_from_asn1pkt(pubkey)
227                marker = b"RSA PUBLIC KEY"
228            except:
229                # We cannot import an ECDSA public key without curve knowledge
230                raise Exception("Unable to import public key")
231
232        if obj.frmt == "DER":
233            obj.pem = der2pem(obj.der, marker)
234        return obj
235
236
237class PubKey(six.with_metaclass(_PubKeyFactory, object)):
238    """
239    Parent class for both PubKeyRSA and PubKeyECDSA.
240    Provides a common verifyCert() method.
241    """
242
243    def verifyCert(self, cert):
244        """ Verifies either a Cert or an X509_Cert. """
245        tbsCert = cert.tbsCertificate
246        sigAlg = tbsCert.signature
247        h = hash_by_oid[sigAlg.algorithm.val]
248        sigVal = raw(cert.signatureValue)
249        return self.verify(raw(tbsCert), sigVal, h=h, t='pkcs')
250
251
252class PubKeyRSA(PubKey, _EncryptAndVerifyRSA):
253    """
254    Wrapper for RSA keys based on _EncryptAndVerifyRSA from crypto/pkcs1.py
255    Use the 'key' attribute to access original object.
256    """
257    @crypto_validator
258    def fill_and_store(self, modulus=None, modulusLen=None, pubExp=None):
259        pubExp = pubExp or 65537
260        if not modulus:
261            real_modulusLen = modulusLen or 2048
262            private_key = rsa.generate_private_key(public_exponent=pubExp,
263                                                   key_size=real_modulusLen,
264                                                   backend=default_backend())
265            self.pubkey = private_key.public_key()
266        else:
267            real_modulusLen = len(binrepr(modulus))
268            if modulusLen and real_modulusLen != modulusLen:
269                warning("modulus and modulusLen do not match!")
270            pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp)
271            self.pubkey = pubNum.public_key(default_backend())
272        # Lines below are only useful for the legacy part of pkcs1.py
273        pubNum = self.pubkey.public_numbers()
274        self._modulusLen = real_modulusLen
275        self._modulus = pubNum.n
276        self._pubExp = pubNum.e
277
278    @crypto_validator
279    def import_from_tuple(self, tup):
280        # this is rarely used
281        e, m, mLen = tup
282        if isinstance(m, bytes):
283            m = pkcs_os2ip(m)
284        if isinstance(e, bytes):
285            e = pkcs_os2ip(e)
286        self.fill_and_store(modulus=m, pubExp=e)
287        self.pem = self.pubkey.public_bytes(
288                        encoding=serialization.Encoding.PEM,
289                        format=serialization.PublicFormat.SubjectPublicKeyInfo)
290        self.der = pem2der(self.pem)
291
292    def import_from_asn1pkt(self, pubkey):
293        modulus    = pubkey.modulus.val
294        pubExp     = pubkey.publicExponent.val
295        self.fill_and_store(modulus=modulus, pubExp=pubExp)
296
297    def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None):
298        # no ECDSA encryption support, hence no ECDSA specific keywords here
299        return _EncryptAndVerifyRSA.encrypt(self, msg, t, h, mgf, L)
300
301    def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None):
302        return _EncryptAndVerifyRSA.verify(self, msg, sig, t, h, mgf, L)
303
304class PubKeyECDSA(PubKey):
305    """
306    Wrapper for ECDSA keys based on the cryptography library.
307    Use the 'key' attribute to access original object.
308    """
309    @crypto_validator
310    def fill_and_store(self, curve=None):
311        curve = curve or ec.SECP256R1
312        private_key = ec.generate_private_key(curve(), default_backend())
313        self.pubkey = private_key.public_key()
314
315    @crypto_validator
316    def import_from_der(self, pubkey):
317        # No lib support for explicit curves nor compressed points.
318        self.pubkey = serialization.load_der_public_key(pubkey,
319                                                    backend=default_backend())
320
321    def encrypt(self, msg, h="sha256", **kwargs):
322        # cryptography lib does not support ECDSA encryption
323        raise Exception("No ECDSA encryption support")
324
325    @crypto_validator
326    def verify(self, msg, sig, h="sha256", **kwargs):
327        # 'sig' should be a DER-encoded signature, as per RFC 3279
328        verifier = self.pubkey.verifier(sig, ec.ECDSA(_get_hash(h)))
329        verifier.update(msg)
330        return verifier.verify()
331
332
333################
334# Private Keys #
335################
336
337class _PrivKeyFactory(_PKIObjMaker):
338    """
339    Metaclass for PrivKey creation.
340    It casts the appropriate class on the fly, then fills in
341    the appropriate attributes with import_from_asn1pkt() submethod.
342    """
343    def __call__(cls, key_path=None):
344        """
345        key_path may be the path to either:
346            _an RSAPrivateKey_OpenSSL (as generated by openssl);
347            _an ECDSAPrivateKey_OpenSSL (as generated by openssl);
348            _an RSAPrivateKey;
349            _an ECDSAPrivateKey.
350        """
351        if key_path is None:
352            obj = type.__call__(cls)
353            if cls is PrivKey:
354                cls = PrivKeyECDSA
355            obj.__class__ = cls
356            obj.frmt = "original"
357            obj.fill_and_store()
358            return obj
359
360        obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE)
361        multiPEM = False
362        try:
363            privkey = RSAPrivateKey_OpenSSL(obj.der)
364            privkey = privkey.privateKey
365            obj.__class__ = PrivKeyRSA
366            marker = b"PRIVATE KEY"
367        except:
368            try:
369                privkey = ECDSAPrivateKey_OpenSSL(obj.der)
370                privkey = privkey.privateKey
371                obj.__class__ = PrivKeyECDSA
372                marker = b"EC PRIVATE KEY"
373                multiPEM = True
374            except:
375                try:
376                    privkey = RSAPrivateKey(obj.der)
377                    obj.__class__ = PrivKeyRSA
378                    marker = b"RSA PRIVATE KEY"
379                except:
380                    try:
381                        privkey = ECDSAPrivateKey(obj.der)
382                        obj.__class__ = PrivKeyECDSA
383                        marker = b"EC PRIVATE KEY"
384                    except:
385                        raise Exception("Unable to import private key")
386        try:
387            obj.import_from_asn1pkt(privkey)
388        except ImportError:
389            pass
390
391        if obj.frmt == "DER":
392            if multiPEM:
393                # this does not restore the EC PARAMETERS header
394                obj.pem = der2pem(raw(privkey), marker)
395            else:
396                obj.pem = der2pem(obj.der, marker)
397        return obj
398
399
400class PrivKey(six.with_metaclass(_PrivKeyFactory, object)):
401    """
402    Parent class for both PrivKeyRSA and PrivKeyECDSA.
403    Provides common signTBSCert() and resignCert() methods.
404    """
405
406    def signTBSCert(self, tbsCert, h="sha256"):
407        """
408        Note that this will always copy the signature field from the
409        tbsCertificate into the signatureAlgorithm field of the result,
410        regardless of the coherence between its contents (which might
411        indicate ecdsa-with-SHA512) and the result (e.g. RSA signing MD2).
412
413        There is a small inheritance trick for the computation of sigVal
414        below: in order to use a sign() method which would apply
415        to both PrivKeyRSA and PrivKeyECDSA, the sign() methods of the
416        subclasses accept any argument, be it from the RSA or ECDSA world,
417        and then they keep the ones they're interested in.
418        Here, t will be passed eventually to pkcs1._DecryptAndSignRSA.sign().
419        """
420        sigAlg = tbsCert.signature
421        h = h or hash_by_oid[sigAlg.algorithm.val]
422        sigVal = self.sign(raw(tbsCert), h=h, t='pkcs')
423        c = X509_Cert()
424        c.tbsCertificate = tbsCert
425        c.signatureAlgorithm = sigAlg
426        c.signatureValue = ASN1_BIT_STRING(sigVal, readable=True)
427        return c
428
429    def resignCert(self, cert):
430        """ Rewrite the signature of either a Cert or an X509_Cert. """
431        return self.signTBSCert(cert.tbsCertificate)
432
433    def verifyCert(self, cert):
434        """ Verifies either a Cert or an X509_Cert. """
435        tbsCert = cert.tbsCertificate
436        sigAlg = tbsCert.signature
437        h = hash_by_oid[sigAlg.algorithm.val]
438        sigVal = raw(cert.signatureValue)
439        return self.verify(raw(tbsCert), sigVal, h=h, t='pkcs')
440
441
442class PrivKeyRSA(PrivKey, _EncryptAndVerifyRSA, _DecryptAndSignRSA):
443    """
444    Wrapper for RSA keys based on _DecryptAndSignRSA from crypto/pkcs1.py
445    Use the 'key' attribute to access original object.
446    """
447    @crypto_validator
448    def fill_and_store(self, modulus=None, modulusLen=None, pubExp=None,
449                             prime1=None, prime2=None, coefficient=None,
450                             exponent1=None, exponent2=None, privExp=None):
451        pubExp = pubExp or 65537
452        if None in [modulus, prime1, prime2, coefficient, privExp,
453                    exponent1, exponent2]:
454            # note that the library requires every parameter
455            # in order to call RSAPrivateNumbers(...)
456            # if one of these is missing, we generate a whole new key
457            real_modulusLen = modulusLen or 2048
458            self.key = rsa.generate_private_key(public_exponent=pubExp,
459                                                key_size=real_modulusLen,
460                                                backend=default_backend())
461            self.pubkey = self.key.public_key()
462        else:
463            real_modulusLen = len(binrepr(modulus))
464            if modulusLen and real_modulusLen != modulusLen:
465                warning("modulus and modulusLen do not match!")
466            pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp)
467            privNum = rsa.RSAPrivateNumbers(p=prime1, q=prime2,
468                                            dmp1=exponent1, dmq1=exponent2,
469                                            iqmp=coefficient, d=privExp,
470                                            public_numbers=pubNum)
471            self.key = privNum.private_key(default_backend())
472            self.pubkey = self.key.public_key()
473
474        # Lines below are only useful for the legacy part of pkcs1.py
475        pubNum = self.pubkey.public_numbers()
476        self._modulusLen = real_modulusLen
477        self._modulus = pubNum.n
478        self._pubExp = pubNum.e
479
480    def import_from_asn1pkt(self, privkey):
481        modulus     = privkey.modulus.val
482        pubExp      = privkey.publicExponent.val
483        privExp     = privkey.privateExponent.val
484        prime1      = privkey.prime1.val
485        prime2      = privkey.prime2.val
486        exponent1   = privkey.exponent1.val
487        exponent2   = privkey.exponent2.val
488        coefficient = privkey.coefficient.val
489        self.fill_and_store(modulus=modulus, pubExp=pubExp,
490                            privExp=privExp, prime1=prime1, prime2=prime2,
491                            exponent1=exponent1, exponent2=exponent2,
492                            coefficient=coefficient)
493
494    def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None):
495        # Let's copy this from PubKeyRSA instead of adding another baseclass :)
496        return _EncryptAndVerifyRSA.verify(self, msg, sig, t, h, mgf, L)
497
498    def sign(self, data, t="pkcs", h="sha256", mgf=None, L=None):
499        return _DecryptAndSignRSA.sign(self, data, t, h, mgf, L)
500
501
502class PrivKeyECDSA(PrivKey):
503    """
504    Wrapper for ECDSA keys based on SigningKey from ecdsa library.
505    Use the 'key' attribute to access original object.
506    """
507    @crypto_validator
508    def fill_and_store(self, curve=None):
509        curve = curve or ec.SECP256R1
510        self.key = ec.generate_private_key(curve(), default_backend())
511        self.pubkey = self.key.public_key()
512
513    @crypto_validator
514    def import_from_asn1pkt(self, privkey):
515        self.key = serialization.load_der_private_key(raw(privkey), None,
516                                                  backend=default_backend())
517        self.pubkey = self.key.public_key()
518
519    @crypto_validator
520    def verify(self, msg, sig, h="sha256", **kwargs):
521        # 'sig' should be a DER-encoded signature, as per RFC 3279
522        verifier = self.pubkey.verifier(sig, ec.ECDSA(_get_hash(h)))
523        verifier.update(msg)
524        return verifier.verify()
525
526    @crypto_validator
527    def sign(self, data, h="sha256", **kwargs):
528        signer = self.key.signer(ec.ECDSA(_get_hash(h)))
529        signer.update(data)
530        return signer.finalize()
531
532
533################
534# Certificates #
535################
536
537class _CertMaker(_PKIObjMaker):
538    """
539    Metaclass for Cert creation. It is not necessary as it was for the keys,
540    but we reuse the model instead of creating redundant constructors.
541    """
542    def __call__(cls, cert_path):
543        obj = _PKIObjMaker.__call__(cls, cert_path,
544                                    _MAX_CERT_SIZE, "CERTIFICATE")
545        obj.__class__ = Cert
546        try:
547            cert = X509_Cert(obj.der)
548        except:
549            raise Exception("Unable to import certificate")
550        obj.import_from_asn1pkt(cert)
551        return obj
552
553
554class Cert(six.with_metaclass(_CertMaker, object)):
555    """
556    Wrapper for the X509_Cert from layers/x509.py.
557    Use the 'x509Cert' attribute to access original object.
558    """
559
560    def import_from_asn1pkt(self, cert):
561        error_msg = "Unable to import certificate"
562
563        self.x509Cert = cert
564
565        tbsCert = cert.tbsCertificate
566        self.tbsCertificate = tbsCert
567
568        if tbsCert.version:
569            self.version = tbsCert.version.val + 1
570        else:
571            self.version = 1
572        self.serial = tbsCert.serialNumber.val
573        self.sigAlg = tbsCert.signature.algorithm.oidname
574        self.issuer = tbsCert.get_issuer()
575        self.issuer_str = tbsCert.get_issuer_str()
576        self.issuer_hash = hash(self.issuer_str)
577        self.subject = tbsCert.get_subject()
578        self.subject_str = tbsCert.get_subject_str()
579        self.subject_hash = hash(self.subject_str)
580
581        self.notBefore_str = tbsCert.validity.not_before.pretty_time
582        notBefore = tbsCert.validity.not_before.val
583        if notBefore[-1] == "Z":
584            notBefore = notBefore[:-1]
585        try:
586            self.notBefore = time.strptime(notBefore, "%y%m%d%H%M%S")
587        except:
588            raise Exception(error_msg)
589        self.notBefore_str_simple = time.strftime("%x", self.notBefore)
590
591        self.notAfter_str = tbsCert.validity.not_after.pretty_time
592        notAfter = tbsCert.validity.not_after.val
593        if notAfter[-1] == "Z":
594            notAfter = notAfter[:-1]
595        try:
596            self.notAfter = time.strptime(notAfter, "%y%m%d%H%M%S")
597        except:
598            raise Exception(error_msg)
599        self.notAfter_str_simple = time.strftime("%x", self.notAfter)
600
601        self.pubKey = PubKey(raw(tbsCert.subjectPublicKeyInfo))
602
603        if tbsCert.extensions:
604            for extn in tbsCert.extensions:
605                if extn.extnID.oidname == "basicConstraints":
606                    self.cA = False
607                    if extn.extnValue.cA:
608                        self.cA = not (extn.extnValue.cA.val == 0)
609                elif extn.extnID.oidname == "keyUsage":
610                    self.keyUsage = extn.extnValue.get_keyUsage()
611                elif extn.extnID.oidname == "extKeyUsage":
612                    self.extKeyUsage = extn.extnValue.get_extendedKeyUsage()
613                elif extn.extnID.oidname == "authorityKeyIdentifier":
614                    self.authorityKeyID = extn.extnValue.keyIdentifier.val
615
616        self.signatureValue = raw(cert.signatureValue)
617        self.signatureLen = len(self.signatureValue)
618
619    def isIssuerCert(self, other):
620        """
621        True if 'other' issued 'self', i.e.:
622          - self.issuer == other.subject
623          - self is signed by other
624        """
625        if self.issuer_hash != other.subject_hash:
626            return False
627        return other.pubKey.verifyCert(self)
628
629    def isSelfSigned(self):
630        """
631        Return True if the certificate is self-signed:
632          - issuer and subject are the same
633          - the signature of the certificate is valid.
634        """
635        if self.issuer_hash == self.subject_hash:
636            return self.isIssuerCert(self)
637        return False
638
639    def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None):
640        # no ECDSA *encryption* support, hence only RSA specific keywords here
641        return self.pubKey.encrypt(msg, t, h, mgf, L)
642
643    def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None):
644        return self.pubKey.verify(msg, sig, t, h, mgf, L)
645
646    def remainingDays(self, now=None):
647        """
648        Based on the value of notAfter field, returns the number of
649        days the certificate will still be valid. The date used for the
650        comparison is the current and local date, as returned by
651        time.localtime(), except if 'now' argument is provided another
652        one. 'now' argument can be given as either a time tuple or a string
653        representing the date. Accepted format for the string version
654        are:
655
656         - '%b %d %H:%M:%S %Y %Z' e.g. 'Jan 30 07:38:59 2008 GMT'
657         - '%m/%d/%y' e.g. '01/30/08' (less precise)
658
659        If the certificate is no more valid at the date considered, then
660        a negative value is returned representing the number of days
661        since it has expired.
662
663        The number of days is returned as a float to deal with the unlikely
664        case of certificates that are still just valid.
665        """
666        if now is None:
667            now = time.localtime()
668        elif isinstance(now, str):
669            try:
670                if '/' in now:
671                    now = time.strptime(now, '%m/%d/%y')
672                else:
673                    now = time.strptime(now, '%b %d %H:%M:%S %Y %Z')
674            except:
675                warning("Bad time string provided, will use localtime() instead.")
676                now = time.localtime()
677
678        now = time.mktime(now)
679        nft = time.mktime(self.notAfter)
680        diff = (nft - now)/(24.*3600)
681        return diff
682
683    def isRevoked(self, crl_list):
684        """
685        Given a list of trusted CRL (their signature has already been
686        verified with trusted anchors), this function returns True if
687        the certificate is marked as revoked by one of those CRL.
688
689        Note that if the Certificate was on hold in a previous CRL and
690        is now valid again in a new CRL and bot are in the list, it
691        will be considered revoked: this is because _all_ CRLs are
692        checked (not only the freshest) and revocation status is not
693        handled.
694
695        Also note that the check on the issuer is performed on the
696        Authority Key Identifier if available in _both_ the CRL and the
697        Cert. Otherwise, the issuers are simply compared.
698        """
699        for c in crl_list:
700            if (self.authorityKeyID is not None and
701                c.authorityKeyID is not None and
702                self.authorityKeyID == c.authorityKeyID):
703                return self.serial in (x[0] for x in c.revoked_cert_serials)
704            elif self.issuer == c.issuer:
705                return self.serial in (x[0] for x in c.revoked_cert_serials)
706        return False
707
708    def export(self, filename, fmt="DER"):
709        """
710        Export certificate in 'fmt' format (DER or PEM) to file 'filename'
711        """
712        f = open(filename, "wb")
713        if fmt == "DER":
714            f.write(self.der)
715        elif fmt == "PEM":
716            f.write(self.pem)
717        f.close()
718
719    def show(self):
720        print("Serial: %s" % self.serial)
721        print("Issuer: " + self.issuer_str)
722        print("Subject: " + self.subject_str)
723        print("Validity: %s to %s" % (self.notBefore_str, self.notAfter_str))
724
725    def __repr__(self):
726        return "[X.509 Cert. Subject:%s, Issuer:%s]" % (self.subject_str, self.issuer_str)
727
728
729################################
730# Certificate Revocation Lists #
731################################
732
733class _CRLMaker(_PKIObjMaker):
734    """
735    Metaclass for CRL creation. It is not necessary as it was for the keys,
736    but we reuse the model instead of creating redundant constructors.
737    """
738    def __call__(cls, cert_path):
739        obj = _PKIObjMaker.__call__(cls, cert_path, _MAX_CRL_SIZE, "X509 CRL")
740        obj.__class__ = CRL
741        try:
742            crl = X509_CRL(obj.der)
743        except:
744            raise Exception("Unable to import CRL")
745        obj.import_from_asn1pkt(crl)
746        return obj
747
748
749class CRL(six.with_metaclass(_CRLMaker, object)):
750    """
751    Wrapper for the X509_CRL from layers/x509.py.
752    Use the 'x509CRL' attribute to access original object.
753    """
754
755    def import_from_asn1pkt(self, crl):
756        error_msg = "Unable to import CRL"
757
758        self.x509CRL = crl
759
760        tbsCertList = crl.tbsCertList
761        self.tbsCertList = raw(tbsCertList)
762
763        if tbsCertList.version:
764            self.version = tbsCertList.version.val + 1
765        else:
766            self.version = 1
767        self.sigAlg = tbsCertList.signature.algorithm.oidname
768        self.issuer = tbsCertList.get_issuer()
769        self.issuer_str = tbsCertList.get_issuer_str()
770        self.issuer_hash = hash(self.issuer_str)
771
772        self.lastUpdate_str = tbsCertList.this_update.pretty_time
773        lastUpdate = tbsCertList.this_update.val
774        if lastUpdate[-1] == "Z":
775            lastUpdate = lastUpdate[:-1]
776        try:
777            self.lastUpdate = time.strptime(lastUpdate, "%y%m%d%H%M%S")
778        except:
779            raise Exception(error_msg)
780        self.lastUpdate_str_simple = time.strftime("%x", self.lastUpdate)
781
782        self.nextUpdate = None
783        self.nextUpdate_str_simple = None
784        if tbsCertList.next_update:
785            self.nextUpdate_str = tbsCertList.next_update.pretty_time
786            nextUpdate = tbsCertList.next_update.val
787            if nextUpdate[-1] == "Z":
788                nextUpdate = nextUpdate[:-1]
789            try:
790                self.nextUpdate = time.strptime(nextUpdate, "%y%m%d%H%M%S")
791            except:
792                raise Exception(error_msg)
793            self.nextUpdate_str_simple = time.strftime("%x", self.nextUpdate)
794
795        if tbsCertList.crlExtensions:
796            for extension in tbsCertList.crlExtensions:
797                if extension.extnID.oidname == "cRLNumber":
798                    self.number = extension.extnValue.cRLNumber.val
799
800        revoked = []
801        if tbsCertList.revokedCertificates:
802            for cert in tbsCertList.revokedCertificates:
803                serial = cert.serialNumber.val
804                date = cert.revocationDate.val
805                if date[-1] == "Z":
806                    date = date[:-1]
807                try:
808                    revocationDate = time.strptime(date, "%y%m%d%H%M%S")
809                except:
810                    raise Exception(error_msg)
811                revoked.append((serial, date))
812        self.revoked_cert_serials = revoked
813
814        self.signatureValue = raw(crl.signatureValue)
815        self.signatureLen = len(self.signatureValue)
816
817    def isIssuerCert(self, other):
818        # This is exactly the same thing as in Cert method.
819        if self.issuer_hash != other.subject_hash:
820            return False
821        return other.pubKey.verifyCert(self)
822
823    def verify(self, anchors):
824        # Return True iff the CRL is signed by one of the provided anchors.
825        for a in anchors:
826            if self.isIssuerCert(a):
827                return True
828        return False
829
830    def show(self):
831        print("Version: %d" % self.version)
832        print("sigAlg: " + self.sigAlg)
833        print("Issuer: " + self.issuer_str)
834        print("lastUpdate: %s" % self.lastUpdate_str)
835        print("nextUpdate: %s" % self.nextUpdate_str)
836
837
838######################
839# Certificate chains #
840######################
841
842class Chain(list):
843    """
844    Basically, an enhanced array of Cert.
845    """
846    def __init__(self, certList, cert0=None):
847        """
848        Construct a chain of certificates starting with a self-signed
849        certificate (or any certificate submitted by the user)
850        and following issuer/subject matching and signature validity.
851        If there is exactly one chain to be constructed, it will be,
852        but if there are multiple potential chains, there is no guarantee
853        that the retained one will be the longest one.
854        As Cert and CRL classes both share an isIssuerCert() method,
855        the trailing element of a Chain may alternatively be a CRL.
856
857        Note that we do not check AKID/{SKID/issuer/serial} matching,
858        nor the presence of keyCertSign in keyUsage extension (if present).
859        """
860        list.__init__(self, ())
861        if cert0:
862            self.append(cert0)
863        else:
864            for root_candidate in certList:
865                if root_candidate.isSelfSigned():
866                    self.append(root_candidate)
867                    certList.remove(root_candidate)
868                    break
869
870        if len(self) > 0:
871            while certList:
872                l = len(self)
873                for c in certList:
874                    if c.isIssuerCert(self[-1]):
875                        self.append(c)
876                        certList.remove(c)
877                        break
878                if len(self) == l:
879                    # no new certificate appended to self
880                    break
881
882    def verifyChain(self, anchors, untrusted=None):
883        """
884        Perform verification of certificate chains for that certificate.
885        A list of anchors is required. The certificates in the optional
886        untrusted list may be used as additional elements to the final chain.
887        On par with chain instantiation, only one chain constructed with the
888        untrusted candidates will be retained. Eventually, dates are checked.
889        """
890        untrusted = untrusted or []
891        for a in anchors:
892            chain = Chain(self + untrusted, a)
893            if len(chain) == 1:             # anchor only
894                continue
895            # check that the chain does not exclusively rely on untrusted
896            found = False
897            for c in self:
898                if c in chain[1:]:
899                    found = True
900            if found:
901                for c in chain:
902                    if c.remainingDays() < 0:
903                        break
904                if c is chain[-1]:      # we got to the end of the chain
905                    return chain
906        return None
907
908    def verifyChainFromCAFile(self, cafile, untrusted_file=None):
909        """
910        Does the same job as .verifyChain() but using the list of anchors
911        from the cafile. As for .verifyChain(), a list of untrusted
912        certificates can be passed (as a file, this time).
913        """
914        try:
915            f = open(cafile)
916            ca_certs = f.read()
917            f.close()
918        except:
919            raise Exception("Could not read from cafile")
920
921        anchors = [Cert(c) for c in split_pem(ca_certs)]
922
923        untrusted = None
924        if untrusted_file:
925            try:
926                f = open(untrusted_file)
927                untrusted_certs = f.read()
928                f.close()
929            except:
930                raise Exception("Could not read from untrusted_file")
931            untrusted = [Cert(c) for c in split_pem(untrusted_certs)]
932
933        return self.verifyChain(anchors, untrusted)
934
935    def verifyChainFromCAPath(self, capath, untrusted_file=None):
936        """
937        Does the same job as .verifyChainFromCAFile() but using the list
938        of anchors in capath directory. The directory should (only) contain
939        certificates files in PEM format. As for .verifyChainFromCAFile(),
940        a list of untrusted certificates can be passed as a file
941        (concatenation of the certificates in PEM format).
942        """
943        try:
944            anchors = []
945            for cafile in os.listdir(capath):
946                anchors.append(Cert(open(cafile).read()))
947        except:
948            raise Exception("capath provided is not a valid cert path")
949
950        untrusted = None
951        if untrusted_file:
952            try:
953                f = open(untrusted_file)
954                untrusted_certs = f.read()
955                f.close()
956            except:
957                raise Exception("Could not read from untrusted_file")
958            untrusted = [Cert(c) for c in split_pem(untrusted_certs)]
959
960        return self.verifyChain(anchors, untrusted)
961
962    def __repr__(self):
963        llen = len(self) - 1
964        if llen < 0:
965            return ""
966        c = self[0]
967        s = "__ "
968        if not c.isSelfSigned():
969            s += "%s [Not Self Signed]\n" % c.subject_str
970        else:
971            s += "%s [Self Signed]\n" % c.subject_str
972        idx = 1
973        while idx <= llen:
974            c = self[idx]
975            s += "%s\_ %s" % (" "*idx*2, c.subject_str)
976            if idx != llen:
977                s += "\n"
978            idx += 1
979        return s
980
981
982##############################
983# Certificate export helpers #
984##############################
985
986def _create_ca_file(anchor_list, filename):
987    """
988    Concatenate all the certificates (PEM format for the export) in
989    'anchor_list' and write the result to file 'filename'. On success
990    'filename' is returned, None otherwise.
991
992    If you are used to OpenSSL tools, this function builds a CAfile
993    that can be used for certificate and CRL check.
994    """
995    try:
996        f = open(filename, "w")
997        for a in anchor_list:
998            s = a.output(fmt="PEM")
999            f.write(s)
1000        f.close()
1001    except IOError:
1002        return None
1003    return filename
1004
1005