• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) 2008 Arnaud Ebalard <arnaud.ebalard@eads.net>
5#                                   <arno@natisbad.org>
6#   2015, 2016, 2017 Maxence Tury   <maxence.tury@ssi.gouv.fr>
7
8"""
9High-level methods for PKI objects (X.509 certificates, CRLs, asymmetric keys).
10Supports both RSA and ECDSA objects.
11
12The classes below are wrappers for the ASN.1 objects defined in x509.py.
13By collecting their attributes, we bypass the ASN.1 structure, hence
14there is no direct method for exporting a new full DER-encoded version
15of a Cert instance after its serial has been modified (for example).
16If you need to modify an import, just use the corresponding ASN1_Packet.
17
18For instance, here is what you could do in order to modify the serial of
19'cert' and then resign it with whatever 'key'::
20
21    f = open('cert.der')
22    c = X509_Cert(f.read())
23    c.tbsCertificate.serialNumber = 0x4B1D
24    k = PrivKey('key.pem')
25    new_x509_cert = k.resignCert(c)
26
27No need for obnoxious openssl tweaking anymore. :)
28"""
29
30import base64
31import os
32import time
33
34from scapy.config import conf, crypto_validator
35from scapy.error import warning
36from scapy.utils import binrepr
37from scapy.asn1.asn1 import ASN1_BIT_STRING
38from scapy.asn1.mib import hash_by_oid
39from scapy.layers.x509 import (
40    ECDSAPrivateKey_OpenSSL,
41    ECDSAPrivateKey,
42    ECDSAPublicKey,
43    EdDSAPublicKey,
44    EdDSAPrivateKey,
45    RSAPrivateKey_OpenSSL,
46    RSAPrivateKey,
47    RSAPublicKey,
48    X509_Cert,
49    X509_CRL,
50    X509_SubjectPublicKeyInfo,
51)
52from scapy.layers.tls.crypto.pkcs1 import pkcs_os2ip, _get_hash, \
53    _EncryptAndVerifyRSA, _DecryptAndSignRSA
54from scapy.compat import raw, bytes_encode
55
56if conf.crypto_valid:
57    from cryptography.exceptions import InvalidSignature
58    from cryptography.hazmat.backends import default_backend
59    from cryptography.hazmat.primitives import serialization
60    from cryptography.hazmat.primitives.asymmetric import rsa, ec, x25519
61
62    # cryptography raised the minimum RSA key length to 1024 in 43.0+
63    # https://github.com/pyca/cryptography/pull/10278
64    # but we need still 512 for EXPORT40 ciphers (yes EXPORT is terrible)
65    # https://datatracker.ietf.org/doc/html/rfc2246#autoid-66
66    # The following detects the change and hacks around it using the backend
67
68    try:
69        rsa.generate_private_key(public_exponent=65537, key_size=512)
70        _RSA_512_SUPPORTED = True
71    except ValueError:
72        # cryptography > 43.0
73        _RSA_512_SUPPORTED = False
74        from cryptography.hazmat.primitives.asymmetric.rsa import rust_openssl
75
76
77# Maximum allowed size in bytes for a certificate file, to avoid
78# loading huge file when importing a cert
79_MAX_KEY_SIZE = 50 * 1024
80_MAX_CERT_SIZE = 50 * 1024
81_MAX_CRL_SIZE = 10 * 1024 * 1024   # some are that big
82
83
84#####################################################################
85# Some helpers
86#####################################################################
87
88@conf.commands.register
89def der2pem(der_string, obj="UNKNOWN"):
90    """Convert DER octet string to PEM format (with optional header)"""
91    # Encode a byte string in PEM format. Header advertises <obj> type.
92    pem_string = ("-----BEGIN %s-----\n" % obj).encode()
93    base64_string = base64.b64encode(der_string)
94    chunks = [base64_string[i:i + 64] for i in range(0, len(base64_string), 64)]  # noqa: E501
95    pem_string += b'\n'.join(chunks)
96    pem_string += ("\n-----END %s-----\n" % obj).encode()
97    return pem_string
98
99
100@conf.commands.register
101def pem2der(pem_string):
102    """Convert PEM string to DER format"""
103    # Encode all lines between the first '-----\n' and the 2nd-to-last '-----'.
104    pem_string = pem_string.replace(b"\r", b"")
105    first_idx = pem_string.find(b"-----\n") + 6
106    if pem_string.find(b"-----BEGIN", first_idx) != -1:
107        raise Exception("pem2der() expects only one PEM-encoded object")
108    last_idx = pem_string.rfind(b"-----", 0, pem_string.rfind(b"-----"))
109    base64_string = pem_string[first_idx:last_idx]
110    base64_string.replace(b"\n", b"")
111    der_string = base64.b64decode(base64_string)
112    return der_string
113
114
115def split_pem(s):
116    """
117    Split PEM objects. Useful to process concatenated certificates.
118    """
119    pem_strings = []
120    while s != b"":
121        start_idx = s.find(b"-----BEGIN")
122        if start_idx == -1:
123            break
124        end_idx = s.find(b"-----END")
125        if end_idx == -1:
126            raise Exception("Invalid PEM object (missing END tag)")
127        end_idx = s.find(b"\n", end_idx) + 1
128        if end_idx == 0:
129            # There is no final \n
130            end_idx = len(s)
131        pem_strings.append(s[start_idx:end_idx])
132        s = s[end_idx:]
133    return pem_strings
134
135
136class _PKIObj(object):
137    def __init__(self, frmt, der, pem):
138        # Note that changing attributes of the _PKIObj does not update these
139        # values (e.g. modifying k.modulus does not change k.der).
140        # XXX use __setattr__ for this
141        self.frmt = frmt
142        self.der = der
143        self.pem = pem
144
145    def __str__(self):
146        return self.der
147
148
149class _PKIObjMaker(type):
150    def __call__(cls, obj_path, obj_max_size, pem_marker=None):
151        # This enables transparent DER and PEM-encoded data imports.
152        # Note that when importing a PEM file with multiple objects (like ECDSA
153        # private keys output by openssl), it will concatenate every object in
154        # order to create a 'der' attribute. When converting a 'multi' DER file
155        # into a PEM file, though, the PEM attribute will not be valid,
156        # because we do not try to identify the class of each object.
157        error_msg = "Unable to import data"
158
159        if obj_path is None:
160            raise Exception(error_msg)
161        obj_path = bytes_encode(obj_path)
162
163        if (b'\x00' not in obj_path) and os.path.isfile(obj_path):
164            _size = os.path.getsize(obj_path)
165            if _size > obj_max_size:
166                raise Exception(error_msg)
167            try:
168                with open(obj_path, "rb") as f:
169                    _raw = f.read()
170            except Exception:
171                raise Exception(error_msg)
172        else:
173            _raw = obj_path
174
175        try:
176            if b"-----BEGIN" in _raw:
177                frmt = "PEM"
178                pem = _raw
179                der_list = split_pem(_raw)
180                der = b''.join(map(pem2der, der_list))
181            else:
182                frmt = "DER"
183                der = _raw
184                pem = ""
185                if pem_marker is not None:
186                    pem = der2pem(_raw, pem_marker)
187                # type identification may be needed for pem_marker
188                # in such case, the pem attribute has to be updated
189        except Exception:
190            raise Exception(error_msg)
191
192        p = _PKIObj(frmt, der, pem)
193        return p
194
195
196#####################################################################
197# PKI objects wrappers
198#####################################################################
199
200###############
201# Public Keys #
202###############
203
204class _PubKeyFactory(_PKIObjMaker):
205    """
206    Metaclass for PubKey creation.
207    It casts the appropriate class on the fly, then fills in
208    the appropriate attributes with import_from_asn1pkt() submethod.
209    """
210    def __call__(cls, key_path=None):
211
212        if key_path is None:
213            obj = type.__call__(cls)
214            if cls is PubKey:
215                cls = PubKeyRSA
216            obj.__class__ = cls
217            obj.frmt = "original"
218            obj.fill_and_store()
219            return obj
220
221        # This deals with the rare RSA 'kx export' call.
222        if isinstance(key_path, tuple):
223            obj = type.__call__(cls)
224            obj.__class__ = PubKeyRSA
225            obj.frmt = "tuple"
226            obj.import_from_tuple(key_path)
227            return obj
228
229        # Now for the usual calls, key_path may be the path to either:
230        # _an X509_SubjectPublicKeyInfo, as processed by openssl;
231        # _an RSAPublicKey;
232        # _an ECDSAPublicKey;
233        # _an EdDSAPublicKey.
234        obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE)
235        try:
236            spki = X509_SubjectPublicKeyInfo(obj.der)
237            pubkey = spki.subjectPublicKey
238            if isinstance(pubkey, RSAPublicKey):
239                obj.__class__ = PubKeyRSA
240                obj.import_from_asn1pkt(pubkey)
241            elif isinstance(pubkey, ECDSAPublicKey):
242                obj.__class__ = PubKeyECDSA
243                obj.import_from_der(obj.der)
244            elif isinstance(pubkey, EdDSAPublicKey):
245                obj.__class__ = PubKeyEdDSA
246                obj.import_from_der(obj.der)
247            else:
248                raise
249            marker = b"PUBLIC KEY"
250        except Exception:
251            try:
252                pubkey = RSAPublicKey(obj.der)
253                obj.__class__ = PubKeyRSA
254                obj.import_from_asn1pkt(pubkey)
255                marker = b"RSA PUBLIC KEY"
256            except Exception:
257                # We cannot import an ECDSA public key without curve knowledge
258                if conf.debug_dissector:
259                    raise
260                raise Exception("Unable to import public key")
261
262        if obj.frmt == "DER":
263            obj.pem = der2pem(obj.der, marker)
264        return obj
265
266
267class PubKey(metaclass=_PubKeyFactory):
268    """
269    Parent class for both PubKeyRSA and PubKeyECDSA.
270    Provides a common verifyCert() method.
271    """
272
273    def verifyCert(self, cert):
274        """ Verifies either a Cert or an X509_Cert. """
275        tbsCert = cert.tbsCertificate
276        sigAlg = tbsCert.signature
277        h = hash_by_oid[sigAlg.algorithm.val]
278        sigVal = raw(cert.signatureValue)
279        return self.verify(raw(tbsCert), sigVal, h=h, t='pkcs')
280
281
282class PubKeyRSA(PubKey, _EncryptAndVerifyRSA):
283    """
284    Wrapper for RSA keys based on _EncryptAndVerifyRSA from crypto/pkcs1.py
285    Use the 'key' attribute to access original object.
286    """
287    @crypto_validator
288    def fill_and_store(self, modulus=None, modulusLen=None, pubExp=None):
289        pubExp = pubExp or 65537
290        if not modulus:
291            real_modulusLen = modulusLen or 2048
292            if real_modulusLen < 1024 and not _RSA_512_SUPPORTED:
293                # cryptography > 43.0 compatibility
294                private_key = rust_openssl.rsa.generate_private_key(
295                    public_exponent=pubExp,
296                    key_size=real_modulusLen,
297                )
298            else:
299                private_key = rsa.generate_private_key(
300                    public_exponent=pubExp,
301                    key_size=real_modulusLen,
302                    backend=default_backend(),
303                )
304            self.pubkey = private_key.public_key()
305        else:
306            real_modulusLen = len(binrepr(modulus))
307            if modulusLen and real_modulusLen != modulusLen:
308                warning("modulus and modulusLen do not match!")
309            pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp)
310            self.pubkey = pubNum.public_key(default_backend())
311        # Lines below are only useful for the legacy part of pkcs1.py
312        pubNum = self.pubkey.public_numbers()
313        self._modulusLen = real_modulusLen
314        self._modulus = pubNum.n
315        self._pubExp = pubNum.e
316
317    @crypto_validator
318    def import_from_tuple(self, tup):
319        # this is rarely used
320        e, m, mLen = tup
321        if isinstance(m, bytes):
322            m = pkcs_os2ip(m)
323        if isinstance(e, bytes):
324            e = pkcs_os2ip(e)
325        self.fill_and_store(modulus=m, pubExp=e)
326        self.pem = self.pubkey.public_bytes(
327            encoding=serialization.Encoding.PEM,
328            format=serialization.PublicFormat.SubjectPublicKeyInfo)
329        self.der = pem2der(self.pem)
330
331    def import_from_asn1pkt(self, pubkey):
332        modulus = pubkey.modulus.val
333        pubExp = pubkey.publicExponent.val
334        self.fill_and_store(modulus=modulus, pubExp=pubExp)
335
336    def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None):
337        # no ECDSA encryption support, hence no ECDSA specific keywords here
338        return _EncryptAndVerifyRSA.encrypt(self, msg, t=t, h=h, mgf=mgf, L=L)
339
340    def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None):
341        return _EncryptAndVerifyRSA.verify(
342            self, msg, sig, t=t, h=h, mgf=mgf, L=L)
343
344
345class PubKeyECDSA(PubKey):
346    """
347    Wrapper for ECDSA keys based on the cryptography library.
348    Use the 'key' attribute to access original object.
349    """
350    @crypto_validator
351    def fill_and_store(self, curve=None):
352        curve = curve or ec.SECP256R1
353        private_key = ec.generate_private_key(curve(), default_backend())
354        self.pubkey = private_key.public_key()
355
356    @crypto_validator
357    def import_from_der(self, pubkey):
358        # No lib support for explicit curves nor compressed points.
359        self.pubkey = serialization.load_der_public_key(
360            pubkey,
361            backend=default_backend(),
362        )
363
364    def encrypt(self, msg, h="sha256", **kwargs):
365        raise Exception("No ECDSA encryption support")
366
367    @crypto_validator
368    def verify(self, msg, sig, h="sha256", **kwargs):
369        # 'sig' should be a DER-encoded signature, as per RFC 3279
370        try:
371            self.pubkey.verify(sig, msg, ec.ECDSA(_get_hash(h)))
372            return True
373        except InvalidSignature:
374            return False
375
376
377class PubKeyEdDSA(PubKey):
378    """
379    Wrapper for EdDSA keys based on the cryptography library.
380    Use the 'key' attribute to access original object.
381    """
382    @crypto_validator
383    def fill_and_store(self, curve=None):
384        curve = curve or x25519.X25519PrivateKey
385        private_key = curve.generate()
386        self.pubkey = private_key.public_key()
387
388    @crypto_validator
389    def import_from_der(self, pubkey):
390        self.pubkey = serialization.load_der_public_key(
391            pubkey,
392            backend=default_backend(),
393        )
394
395    def encrypt(self, msg, **kwargs):
396        raise Exception("No EdDSA encryption support")
397
398    @crypto_validator
399    def verify(self, msg, sig, **kwargs):
400        # 'sig' should be a DER-encoded signature, as per RFC 3279
401        try:
402            self.pubkey.verify(sig, msg)
403            return True
404        except InvalidSignature:
405            return False
406
407
408################
409# Private Keys #
410################
411
412class _PrivKeyFactory(_PKIObjMaker):
413    """
414    Metaclass for PrivKey creation.
415    It casts the appropriate class on the fly, then fills in
416    the appropriate attributes with import_from_asn1pkt() submethod.
417    """
418    def __call__(cls, key_path=None):
419        """
420        key_path may be the path to either:
421            _an RSAPrivateKey_OpenSSL (as generated by openssl);
422            _an ECDSAPrivateKey_OpenSSL (as generated by openssl);
423            _an RSAPrivateKey;
424            _an ECDSAPrivateKey.
425        """
426        if key_path is None:
427            obj = type.__call__(cls)
428            if cls is PrivKey:
429                cls = PrivKeyECDSA
430            obj.__class__ = cls
431            obj.frmt = "original"
432            obj.fill_and_store()
433            return obj
434
435        obj = _PKIObjMaker.__call__(cls, key_path, _MAX_KEY_SIZE)
436        multiPEM = False
437        try:
438            privkey = RSAPrivateKey_OpenSSL(obj.der)
439            privkey = privkey.privateKey
440            obj.__class__ = PrivKeyRSA
441            marker = b"PRIVATE KEY"
442        except Exception:
443            try:
444                privkey = ECDSAPrivateKey_OpenSSL(obj.der)
445                privkey = privkey.privateKey
446                obj.__class__ = PrivKeyECDSA
447                marker = b"EC PRIVATE KEY"
448                multiPEM = True
449            except Exception:
450                try:
451                    privkey = RSAPrivateKey(obj.der)
452                    obj.__class__ = PrivKeyRSA
453                    marker = b"RSA PRIVATE KEY"
454                except Exception:
455                    try:
456                        privkey = ECDSAPrivateKey(obj.der)
457                        obj.__class__ = PrivKeyECDSA
458                        marker = b"EC PRIVATE KEY"
459                    except Exception:
460                        try:
461                            privkey = EdDSAPrivateKey(obj.der)
462                            obj.__class__ = PrivKeyEdDSA
463                            marker = b"PRIVATE KEY"
464                        except Exception:
465                            raise Exception("Unable to import private key")
466        try:
467            obj.import_from_asn1pkt(privkey)
468        except ImportError:
469            pass
470
471        if obj.frmt == "DER":
472            if multiPEM:
473                # this does not restore the EC PARAMETERS header
474                obj.pem = der2pem(raw(privkey), marker)
475            else:
476                obj.pem = der2pem(obj.der, marker)
477        return obj
478
479
480class _Raw_ASN1_BIT_STRING(ASN1_BIT_STRING):
481    """A ASN1_BIT_STRING that ignores BER encoding"""
482    def __bytes__(self):
483        return self.val_readable
484    __str__ = __bytes__
485
486
487class PrivKey(metaclass=_PrivKeyFactory):
488    """
489    Parent class for both PrivKeyRSA and PrivKeyECDSA.
490    Provides common signTBSCert() and resignCert() methods.
491    """
492
493    def signTBSCert(self, tbsCert, h="sha256"):
494        """
495        Note that this will always copy the signature field from the
496        tbsCertificate into the signatureAlgorithm field of the result,
497        regardless of the coherence between its contents (which might
498        indicate ecdsa-with-SHA512) and the result (e.g. RSA signing MD2).
499
500        There is a small inheritance trick for the computation of sigVal
501        below: in order to use a sign() method which would apply
502        to both PrivKeyRSA and PrivKeyECDSA, the sign() methods of the
503        subclasses accept any argument, be it from the RSA or ECDSA world,
504        and then they keep the ones they're interested in.
505        Here, t will be passed eventually to pkcs1._DecryptAndSignRSA.sign().
506        """
507        sigAlg = tbsCert.signature
508        h = h or hash_by_oid[sigAlg.algorithm.val]
509        sigVal = self.sign(raw(tbsCert), h=h, t='pkcs')
510        c = X509_Cert()
511        c.tbsCertificate = tbsCert
512        c.signatureAlgorithm = sigAlg
513        c.signatureValue = _Raw_ASN1_BIT_STRING(sigVal, readable=True)
514        return c
515
516    def resignCert(self, cert):
517        """ Rewrite the signature of either a Cert or an X509_Cert. """
518        return self.signTBSCert(cert.tbsCertificate, h=None)
519
520    def verifyCert(self, cert):
521        """ Verifies either a Cert or an X509_Cert. """
522        tbsCert = cert.tbsCertificate
523        sigAlg = tbsCert.signature
524        h = hash_by_oid[sigAlg.algorithm.val]
525        sigVal = raw(cert.signatureValue)
526        return self.verify(raw(tbsCert), sigVal, h=h, t='pkcs')
527
528
529class PrivKeyRSA(PrivKey, _EncryptAndVerifyRSA, _DecryptAndSignRSA):
530    """
531    Wrapper for RSA keys based on _DecryptAndSignRSA from crypto/pkcs1.py
532    Use the 'key' attribute to access original object.
533    """
534    @crypto_validator
535    def fill_and_store(self, modulus=None, modulusLen=None, pubExp=None,
536                       prime1=None, prime2=None, coefficient=None,
537                       exponent1=None, exponent2=None, privExp=None):
538        pubExp = pubExp or 65537
539        if None in [modulus, prime1, prime2, coefficient, privExp,
540                    exponent1, exponent2]:
541            # note that the library requires every parameter
542            # in order to call RSAPrivateNumbers(...)
543            # if one of these is missing, we generate a whole new key
544            real_modulusLen = modulusLen or 2048
545            if real_modulusLen < 1024 and not _RSA_512_SUPPORTED:
546                # cryptography > 43.0 compatibility
547                self.key = rust_openssl.rsa.generate_private_key(
548                    public_exponent=pubExp,
549                    key_size=real_modulusLen,
550                )
551            else:
552                self.key = rsa.generate_private_key(
553                    public_exponent=pubExp,
554                    key_size=real_modulusLen,
555                    backend=default_backend(),
556                )
557            self.pubkey = self.key.public_key()
558        else:
559            real_modulusLen = len(binrepr(modulus))
560            if modulusLen and real_modulusLen != modulusLen:
561                warning("modulus and modulusLen do not match!")
562            pubNum = rsa.RSAPublicNumbers(n=modulus, e=pubExp)
563            privNum = rsa.RSAPrivateNumbers(p=prime1, q=prime2,
564                                            dmp1=exponent1, dmq1=exponent2,
565                                            iqmp=coefficient, d=privExp,
566                                            public_numbers=pubNum)
567            self.key = privNum.private_key(default_backend())
568            self.pubkey = self.key.public_key()
569
570        # Lines below are only useful for the legacy part of pkcs1.py
571        pubNum = self.pubkey.public_numbers()
572        self._modulusLen = real_modulusLen
573        self._modulus = pubNum.n
574        self._pubExp = pubNum.e
575
576    def import_from_asn1pkt(self, privkey):
577        modulus = privkey.modulus.val
578        pubExp = privkey.publicExponent.val
579        privExp = privkey.privateExponent.val
580        prime1 = privkey.prime1.val
581        prime2 = privkey.prime2.val
582        exponent1 = privkey.exponent1.val
583        exponent2 = privkey.exponent2.val
584        coefficient = privkey.coefficient.val
585        self.fill_and_store(modulus=modulus, pubExp=pubExp,
586                            privExp=privExp, prime1=prime1, prime2=prime2,
587                            exponent1=exponent1, exponent2=exponent2,
588                            coefficient=coefficient)
589
590    def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None):
591        # Let's copy this from PubKeyRSA instead of adding another baseclass :)
592        return _EncryptAndVerifyRSA.verify(
593            self, msg, sig, t=t, h=h, mgf=mgf, L=L)
594
595    def sign(self, data, t="pkcs", h="sha256", mgf=None, L=None):
596        return _DecryptAndSignRSA.sign(self, data, t=t, h=h, mgf=mgf, L=L)
597
598
599class PrivKeyECDSA(PrivKey):
600    """
601    Wrapper for ECDSA keys based on SigningKey from ecdsa library.
602    Use the 'key' attribute to access original object.
603    """
604    @crypto_validator
605    def fill_and_store(self, curve=None):
606        curve = curve or ec.SECP256R1
607        self.key = ec.generate_private_key(curve(), default_backend())
608        self.pubkey = self.key.public_key()
609
610    @crypto_validator
611    def import_from_asn1pkt(self, privkey):
612        self.key = serialization.load_der_private_key(raw(privkey), None,
613                                                      backend=default_backend())  # noqa: E501
614        self.pubkey = self.key.public_key()
615
616    @crypto_validator
617    def verify(self, msg, sig, h="sha256", **kwargs):
618        # 'sig' should be a DER-encoded signature, as per RFC 3279
619        try:
620            self.pubkey.verify(sig, msg, ec.ECDSA(_get_hash(h)))
621            return True
622        except InvalidSignature:
623            return False
624
625    @crypto_validator
626    def sign(self, data, h="sha256", **kwargs):
627        return self.key.sign(data, ec.ECDSA(_get_hash(h)))
628
629
630class PrivKeyEdDSA(PrivKey):
631    """
632    Wrapper for EdDSA keys
633    Use the 'key' attribute to access original object.
634    """
635    @crypto_validator
636    def fill_and_store(self, curve=None):
637        curve = curve or x25519.X25519PrivateKey
638        self.key = curve.generate()
639        self.pubkey = self.key.public_key()
640
641    @crypto_validator
642    def import_from_asn1pkt(self, privkey):
643        self.key = serialization.load_der_private_key(raw(privkey), None,
644                                                      backend=default_backend())  # noqa: E501
645        self.pubkey = self.key.public_key()
646
647    @crypto_validator
648    def verify(self, msg, sig, **kwargs):
649        # 'sig' should be a DER-encoded signature, as per RFC 3279
650        try:
651            self.pubkey.verify(sig, msg)
652            return True
653        except InvalidSignature:
654            return False
655
656    @crypto_validator
657    def sign(self, data, **kwargs):
658        return self.key.sign(data)
659
660
661################
662# Certificates #
663################
664
665class _CertMaker(_PKIObjMaker):
666    """
667    Metaclass for Cert creation. It is not necessary as it was for the keys,
668    but we reuse the model instead of creating redundant constructors.
669    """
670    def __call__(cls, cert_path):
671        obj = _PKIObjMaker.__call__(cls, cert_path,
672                                    _MAX_CERT_SIZE, "CERTIFICATE")
673        obj.__class__ = Cert
674        try:
675            cert = X509_Cert(obj.der)
676        except Exception:
677            if conf.debug_dissector:
678                raise
679            raise Exception("Unable to import certificate")
680        obj.import_from_asn1pkt(cert)
681        return obj
682
683
684class Cert(metaclass=_CertMaker):
685    """
686    Wrapper for the X509_Cert from layers/x509.py.
687    Use the 'x509Cert' attribute to access original object.
688    """
689
690    def import_from_asn1pkt(self, cert):
691        error_msg = "Unable to import certificate"
692
693        self.x509Cert = cert
694
695        tbsCert = cert.tbsCertificate
696        self.tbsCertificate = tbsCert
697
698        if tbsCert.version:
699            self.version = tbsCert.version.val + 1
700        else:
701            self.version = 1
702        self.serial = tbsCert.serialNumber.val
703        self.sigAlg = tbsCert.signature.algorithm.oidname
704        self.issuer = tbsCert.get_issuer()
705        self.issuer_str = tbsCert.get_issuer_str()
706        self.issuer_hash = hash(self.issuer_str)
707        self.subject = tbsCert.get_subject()
708        self.subject_str = tbsCert.get_subject_str()
709        self.subject_hash = hash(self.subject_str)
710        self.authorityKeyID = None
711
712        self.notBefore_str = tbsCert.validity.not_before.pretty_time
713        try:
714            self.notBefore = tbsCert.validity.not_before.datetime.timetuple()
715        except ValueError:
716            raise Exception(error_msg)
717        self.notBefore_str_simple = time.strftime("%x", self.notBefore)
718
719        self.notAfter_str = tbsCert.validity.not_after.pretty_time
720        try:
721            self.notAfter = tbsCert.validity.not_after.datetime.timetuple()
722        except ValueError:
723            raise Exception(error_msg)
724        self.notAfter_str_simple = time.strftime("%x", self.notAfter)
725
726        self.pubKey = PubKey(raw(tbsCert.subjectPublicKeyInfo))
727
728        if tbsCert.extensions:
729            for extn in tbsCert.extensions:
730                if extn.extnID.oidname == "basicConstraints":
731                    self.cA = False
732                    if extn.extnValue.cA:
733                        self.cA = not (extn.extnValue.cA.val == 0)
734                elif extn.extnID.oidname == "keyUsage":
735                    self.keyUsage = extn.extnValue.get_keyUsage()
736                elif extn.extnID.oidname == "extKeyUsage":
737                    self.extKeyUsage = extn.extnValue.get_extendedKeyUsage()
738                elif extn.extnID.oidname == "authorityKeyIdentifier":
739                    self.authorityKeyID = extn.extnValue.keyIdentifier.val
740
741        self.signatureValue = raw(cert.signatureValue)
742        self.signatureLen = len(self.signatureValue)
743
744    def isIssuerCert(self, other):
745        """
746        True if 'other' issued 'self', i.e.:
747          - self.issuer == other.subject
748          - self is signed by other
749        """
750        if self.issuer_hash != other.subject_hash:
751            return False
752        return other.pubKey.verifyCert(self)
753
754    def isSelfSigned(self):
755        """
756        Return True if the certificate is self-signed:
757          - issuer and subject are the same
758          - the signature of the certificate is valid.
759        """
760        if self.issuer_hash == self.subject_hash:
761            return self.isIssuerCert(self)
762        return False
763
764    def encrypt(self, msg, t="pkcs", h="sha256", mgf=None, L=None):
765        # no ECDSA *encryption* support, hence only RSA specific keywords here
766        return self.pubKey.encrypt(msg, t=t, h=h, mgf=mgf, L=L)
767
768    def verify(self, msg, sig, t="pkcs", h="sha256", mgf=None, L=None):
769        return self.pubKey.verify(msg, sig, t=t, h=h, mgf=mgf, L=L)
770
771    def remainingDays(self, now=None):
772        """
773        Based on the value of notAfter field, returns the number of
774        days the certificate will still be valid. The date used for the
775        comparison is the current and local date, as returned by
776        time.localtime(), except if 'now' argument is provided another
777        one. 'now' argument can be given as either a time tuple or a string
778        representing the date. Accepted format for the string version
779        are:
780
781         - '%b %d %H:%M:%S %Y %Z' e.g. 'Jan 30 07:38:59 2008 GMT'
782         - '%m/%d/%y' e.g. '01/30/08' (less precise)
783
784        If the certificate is no more valid at the date considered, then
785        a negative value is returned representing the number of days
786        since it has expired.
787
788        The number of days is returned as a float to deal with the unlikely
789        case of certificates that are still just valid.
790        """
791        if now is None:
792            now = time.localtime()
793        elif isinstance(now, str):
794            try:
795                if '/' in now:
796                    now = time.strptime(now, '%m/%d/%y')
797                else:
798                    now = time.strptime(now, '%b %d %H:%M:%S %Y %Z')
799            except Exception:
800                warning("Bad time string provided, will use localtime() instead.")  # noqa: E501
801                now = time.localtime()
802
803        now = time.mktime(now)
804        nft = time.mktime(self.notAfter)
805        diff = (nft - now) / (24. * 3600)
806        return diff
807
808    def isRevoked(self, crl_list):
809        """
810        Given a list of trusted CRL (their signature has already been
811        verified with trusted anchors), this function returns True if
812        the certificate is marked as revoked by one of those CRL.
813
814        Note that if the Certificate was on hold in a previous CRL and
815        is now valid again in a new CRL and bot are in the list, it
816        will be considered revoked: this is because _all_ CRLs are
817        checked (not only the freshest) and revocation status is not
818        handled.
819
820        Also note that the check on the issuer is performed on the
821        Authority Key Identifier if available in _both_ the CRL and the
822        Cert. Otherwise, the issuers are simply compared.
823        """
824        for c in crl_list:
825            if (self.authorityKeyID is not None and
826                c.authorityKeyID is not None and
827                    self.authorityKeyID == c.authorityKeyID):
828                return self.serial in (x[0] for x in c.revoked_cert_serials)
829            elif self.issuer == c.issuer:
830                return self.serial in (x[0] for x in c.revoked_cert_serials)
831        return False
832
833    def export(self, filename, fmt="DER"):
834        """
835        Export certificate in 'fmt' format (DER or PEM) to file 'filename'
836        """
837        with open(filename, "wb") as f:
838            if fmt == "DER":
839                f.write(self.der)
840            elif fmt == "PEM":
841                f.write(self.pem)
842
843    def show(self):
844        print("Serial: %s" % self.serial)
845        print("Issuer: " + self.issuer_str)
846        print("Subject: " + self.subject_str)
847        print("Validity: %s to %s" % (self.notBefore_str, self.notAfter_str))
848
849    def __repr__(self):
850        return "[X.509 Cert. Subject:%s, Issuer:%s]" % (self.subject_str, self.issuer_str)  # noqa: E501
851
852
853################################
854# Certificate Revocation Lists #
855################################
856
857class _CRLMaker(_PKIObjMaker):
858    """
859    Metaclass for CRL creation. It is not necessary as it was for the keys,
860    but we reuse the model instead of creating redundant constructors.
861    """
862    def __call__(cls, cert_path):
863        obj = _PKIObjMaker.__call__(cls, cert_path, _MAX_CRL_SIZE, "X509 CRL")
864        obj.__class__ = CRL
865        try:
866            crl = X509_CRL(obj.der)
867        except Exception:
868            raise Exception("Unable to import CRL")
869        obj.import_from_asn1pkt(crl)
870        return obj
871
872
873class CRL(metaclass=_CRLMaker):
874    """
875    Wrapper for the X509_CRL from layers/x509.py.
876    Use the 'x509CRL' attribute to access original object.
877    """
878
879    def import_from_asn1pkt(self, crl):
880        error_msg = "Unable to import CRL"
881
882        self.x509CRL = crl
883
884        tbsCertList = crl.tbsCertList
885        self.tbsCertList = raw(tbsCertList)
886
887        if tbsCertList.version:
888            self.version = tbsCertList.version.val + 1
889        else:
890            self.version = 1
891        self.sigAlg = tbsCertList.signature.algorithm.oidname
892        self.issuer = tbsCertList.get_issuer()
893        self.issuer_str = tbsCertList.get_issuer_str()
894        self.issuer_hash = hash(self.issuer_str)
895
896        self.lastUpdate_str = tbsCertList.this_update.pretty_time
897        lastUpdate = tbsCertList.this_update.val
898        if lastUpdate[-1] == "Z":
899            lastUpdate = lastUpdate[:-1]
900        try:
901            self.lastUpdate = time.strptime(lastUpdate, "%y%m%d%H%M%S")
902        except Exception:
903            raise Exception(error_msg)
904        self.lastUpdate_str_simple = time.strftime("%x", self.lastUpdate)
905
906        self.nextUpdate = None
907        self.nextUpdate_str_simple = None
908        if tbsCertList.next_update:
909            self.nextUpdate_str = tbsCertList.next_update.pretty_time
910            nextUpdate = tbsCertList.next_update.val
911            if nextUpdate[-1] == "Z":
912                nextUpdate = nextUpdate[:-1]
913            try:
914                self.nextUpdate = time.strptime(nextUpdate, "%y%m%d%H%M%S")
915            except Exception:
916                raise Exception(error_msg)
917            self.nextUpdate_str_simple = time.strftime("%x", self.nextUpdate)
918
919        if tbsCertList.crlExtensions:
920            for extension in tbsCertList.crlExtensions:
921                if extension.extnID.oidname == "cRLNumber":
922                    self.number = extension.extnValue.cRLNumber.val
923
924        revoked = []
925        if tbsCertList.revokedCertificates:
926            for cert in tbsCertList.revokedCertificates:
927                serial = cert.serialNumber.val
928                date = cert.revocationDate.val
929                if date[-1] == "Z":
930                    date = date[:-1]
931                try:
932                    time.strptime(date, "%y%m%d%H%M%S")
933                except Exception:
934                    raise Exception(error_msg)
935                revoked.append((serial, date))
936        self.revoked_cert_serials = revoked
937
938        self.signatureValue = raw(crl.signatureValue)
939        self.signatureLen = len(self.signatureValue)
940
941    def isIssuerCert(self, other):
942        # This is exactly the same thing as in Cert method.
943        if self.issuer_hash != other.subject_hash:
944            return False
945        return other.pubKey.verifyCert(self)
946
947    def verify(self, anchors):
948        # Return True iff the CRL is signed by one of the provided anchors.
949        return any(self.isIssuerCert(a) for a in anchors)
950
951    def show(self):
952        print("Version: %d" % self.version)
953        print("sigAlg: " + self.sigAlg)
954        print("Issuer: " + self.issuer_str)
955        print("lastUpdate: %s" % self.lastUpdate_str)
956        print("nextUpdate: %s" % self.nextUpdate_str)
957
958
959######################
960# Certificate chains #
961######################
962
963class Chain(list):
964    """
965    Basically, an enhanced array of Cert.
966    """
967
968    def __init__(self, certList, cert0=None):
969        """
970        Construct a chain of certificates starting with a self-signed
971        certificate (or any certificate submitted by the user)
972        and following issuer/subject matching and signature validity.
973        If there is exactly one chain to be constructed, it will be,
974        but if there are multiple potential chains, there is no guarantee
975        that the retained one will be the longest one.
976        As Cert and CRL classes both share an isIssuerCert() method,
977        the trailing element of a Chain may alternatively be a CRL.
978
979        Note that we do not check AKID/{SKID/issuer/serial} matching,
980        nor the presence of keyCertSign in keyUsage extension (if present).
981        """
982        list.__init__(self, ())
983        if cert0:
984            self.append(cert0)
985        else:
986            for root_candidate in certList:
987                if root_candidate.isSelfSigned():
988                    self.append(root_candidate)
989                    certList.remove(root_candidate)
990                    break
991
992        if len(self) > 0:
993            while certList:
994                tmp_len = len(self)
995                for c in certList:
996                    if c.isIssuerCert(self[-1]):
997                        self.append(c)
998                        certList.remove(c)
999                        break
1000                if len(self) == tmp_len:
1001                    # no new certificate appended to self
1002                    break
1003
1004    def verifyChain(self, anchors, untrusted=None):
1005        """
1006        Perform verification of certificate chains for that certificate.
1007        A list of anchors is required. The certificates in the optional
1008        untrusted list may be used as additional elements to the final chain.
1009        On par with chain instantiation, only one chain constructed with the
1010        untrusted candidates will be retained. Eventually, dates are checked.
1011        """
1012        untrusted = untrusted or []
1013        for a in anchors:
1014            chain = Chain(self + untrusted, a)
1015            if len(chain) == 1:             # anchor only
1016                continue
1017            # check that the chain does not exclusively rely on untrusted
1018            if any(c in chain[1:] for c in self):
1019                for c in chain:
1020                    if c.remainingDays() < 0:
1021                        break
1022                if c is chain[-1]:      # we got to the end of the chain
1023                    return chain
1024        return None
1025
1026    def verifyChainFromCAFile(self, cafile, untrusted_file=None):
1027        """
1028        Does the same job as .verifyChain() but using the list of anchors
1029        from the cafile. As for .verifyChain(), a list of untrusted
1030        certificates can be passed (as a file, this time).
1031        """
1032        try:
1033            with open(cafile, "rb") as f:
1034                ca_certs = f.read()
1035        except Exception:
1036            raise Exception("Could not read from cafile")
1037
1038        anchors = [Cert(c) for c in split_pem(ca_certs)]
1039
1040        untrusted = None
1041        if untrusted_file:
1042            try:
1043                with open(untrusted_file, "rb") as f:
1044                    untrusted_certs = f.read()
1045            except Exception:
1046                raise Exception("Could not read from untrusted_file")
1047            untrusted = [Cert(c) for c in split_pem(untrusted_certs)]
1048
1049        return self.verifyChain(anchors, untrusted)
1050
1051    def verifyChainFromCAPath(self, capath, untrusted_file=None):
1052        """
1053        Does the same job as .verifyChainFromCAFile() but using the list
1054        of anchors in capath directory. The directory should (only) contain
1055        certificates files in PEM format. As for .verifyChainFromCAFile(),
1056        a list of untrusted certificates can be passed as a file
1057        (concatenation of the certificates in PEM format).
1058        """
1059        try:
1060            anchors = []
1061            for cafile in os.listdir(capath):
1062                with open(os.path.join(capath, cafile), "rb") as fd:
1063                    anchors.append(Cert(fd.read()))
1064        except Exception:
1065            raise Exception("capath provided is not a valid cert path")
1066
1067        untrusted = None
1068        if untrusted_file:
1069            try:
1070                with open(untrusted_file, "rb") as f:
1071                    untrusted_certs = f.read()
1072            except Exception:
1073                raise Exception("Could not read from untrusted_file")
1074            untrusted = [Cert(c) for c in split_pem(untrusted_certs)]
1075
1076        return self.verifyChain(anchors, untrusted)
1077
1078    def __repr__(self):
1079        llen = len(self) - 1
1080        if llen < 0:
1081            return ""
1082        c = self[0]
1083        s = "__ "
1084        if not c.isSelfSigned():
1085            s += "%s [Not Self Signed]\n" % c.subject_str
1086        else:
1087            s += "%s [Self Signed]\n" % c.subject_str
1088        idx = 1
1089        while idx <= llen:
1090            c = self[idx]
1091            s += "%s_ %s" % (" " * idx * 2, c.subject_str)
1092            if idx != llen:
1093                s += "\n"
1094            idx += 1
1095        return s
1096