• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) 2007, 2008, 2009 Arnaud Ebalard
5#               2015, 2016, 2017 Maxence Tury
6#               2019 Romain Perez
7
8"""
9TLS session handler.
10"""
11
12import binascii
13import collections
14import socket
15import struct
16
17from scapy.config import conf
18from scapy.compat import raw
19from scapy.error import log_runtime, warning
20from scapy.packet import Packet
21from scapy.pton_ntop import inet_pton
22from scapy.sessions import TCPSession
23from scapy.utils import repr_hex, strxor
24from scapy.layers.inet import TCP
25from scapy.layers.tls.crypto.compression import Comp_NULL
26from scapy.layers.tls.crypto.hkdf import TLS13_HKDF
27from scapy.layers.tls.crypto.prf import PRF
28
29# Typing imports
30from typing import Dict
31
32
33def load_nss_keys(filename):
34    # type: (str) -> Dict[str, bytes]
35    """
36    Parses a NSS Keys log and returns unpacked keys in a dictionary.
37    """
38    # http://udn.realityripple.com/docs/Mozilla/Projects/NSS/Key_Log_Format
39    keys = collections.defaultdict(dict)
40    try:
41        fd = open(filename)
42        fd.close()
43    except FileNotFoundError:
44        warning("Cannot open NSS Key Log: %s", filename)
45        return {}
46    try:
47        with open(filename) as fd:
48            for line in fd:
49                if line.startswith("#"):
50                    continue
51                data = line.strip().split(" ")
52                if len(data) != 3 or data[0] != data[0].upper():
53                    warning("Invalid NSS Key Log Entry: %s", line.strip())
54                    return {}
55
56                try:
57                    client_random = binascii.unhexlify(data[1])
58                except ValueError:
59                    warning("Invalid ClientRandom: %s", data[1])
60                    return {}
61
62                try:
63                    secret = binascii.unhexlify(data[2])
64                except ValueError:
65                    warning("Invalid Secret: %s", data[2])
66                    return {}
67
68                # Warn that a duplicated entry was detected. The latest one
69                # will be kept in the resulting dictionary.
70                if client_random in keys[data[0]]:
71                    warning("Duplicated entry for %s !", data[0])
72
73                keys[data[0]][client_random] = secret
74        return keys
75    except UnicodeDecodeError as ex:
76        warning("Cannot read NSS Key Log: %s %s", filename, str(ex))
77        return {}
78
79
80# Note the following import may happen inside connState.__init__()
81# in order to avoid to avoid cyclical dependencies.
82# from scapy.layers.tls.crypto.suites import TLS_NULL_WITH_NULL_NULL
83
84
85###############################################################################
86#   Connection states                                                         #
87###############################################################################
88
89class connState(object):
90    """
91    From RFC 5246, section 6.1:
92    A TLS connection state is the operating environment of the TLS Record
93    Protocol.  It specifies a compression algorithm, an encryption
94    algorithm, and a MAC algorithm.  In addition, the parameters for
95    these algorithms are known: the MAC key and the bulk encryption keys
96    for the connection in both the read and the write directions.
97    Logically, there are always four connection states outstanding: the
98    current read and write states, and the pending read and write states.
99    All records are processed under the current read and write states.
100    The security parameters for the pending states can be set by the TLS
101    Handshake Protocol, and the ChangeCipherSpec can selectively make
102    either of the pending states current, in which case the appropriate
103    current state is disposed of and replaced with the pending state; the
104    pending state is then reinitialized to an empty state.  It is illegal
105    to make a state that has not been initialized with security
106    parameters a current state.  The initial current state always
107    specifies that no encryption, compression, or MAC will be used.
108
109    (For practical reasons, Scapy scraps these two last lines, through the
110    implementation of dummy ciphers and MAC with TLS_NULL_WITH_NULL_NULL.)
111
112    These attributes and behaviours are mostly mapped in this class.
113    Also, note that Scapy may make a current state out of a pending state
114    which has been initialized with dummy security parameters. We need
115    this in order to know when the content of a TLS message is encrypted,
116    whether we possess the right keys to decipher/verify it or not.
117    For instance, when Scapy parses a CKE without knowledge of any secret,
118    and then a CCS, it needs to know that the following Finished
119    is encrypted and signed according to a new cipher suite, even though
120    it cannot decipher the message nor verify its integrity.
121    """
122
123    def __init__(self,
124                 connection_end="server",
125                 read_or_write="read",
126                 seq_num=0,
127                 compression_alg=Comp_NULL,
128                 ciphersuite=None,
129                 tls_version=0x0303):
130
131        self.tls_version = tls_version
132
133        # It is the user's responsibility to keep the record seq_num
134        # under 2**64-1. If this value gets maxed out, the TLS class in
135        # record.py will crash when trying to encode it with struct.pack().
136        self.seq_num = seq_num
137
138        self.connection_end = connection_end
139        self.row = read_or_write
140
141        if ciphersuite is None:
142            from scapy.layers.tls.crypto.suites import TLS_NULL_WITH_NULL_NULL
143            ciphersuite = TLS_NULL_WITH_NULL_NULL
144        self.ciphersuite = ciphersuite(tls_version=tls_version)
145
146        if not self.ciphersuite.usable:
147            warning("TLS cipher suite not usable. "
148                    "Is the cryptography Python module installed?")
149            return
150
151        self.compression = compression_alg()
152        self.key_exchange = ciphersuite.kx_alg()
153        self.cipher = ciphersuite.cipher_alg()
154        self.hash = ciphersuite.hash_alg()
155
156        if tls_version > 0x0200:
157            if ciphersuite.cipher_alg.type == "aead":
158                self.hmac = None
159                self.mac_len = self.cipher.tag_len
160            else:
161                self.hmac = ciphersuite.hmac_alg()
162                self.mac_len = self.hmac.hmac_len
163        else:
164            self.hmac = ciphersuite.hmac_alg()          # should be Hmac_NULL
165            self.mac_len = self.hash.hash_len
166
167        if tls_version >= 0x0304:
168            self.hkdf = TLS13_HKDF(self.hash.name.lower())
169        else:
170            self.prf = PRF(ciphersuite.hash_alg.name, tls_version)
171
172    def debug_repr(self, name, secret):
173        if conf.debug_tls and secret:
174            log_runtime.debug("TLS: %s %s %s: %s",
175                              self.connection_end,
176                              self.row,
177                              name,
178                              repr_hex(secret))
179
180    def derive_keys(self,
181                    client_random=b"",
182                    server_random=b"",
183                    master_secret=b""):
184        # XXX Can this be called over a non-usable suite? What happens then?
185
186        cs = self.ciphersuite
187
188        # Derive the keys according to the cipher type and protocol version
189        key_block = self.prf.derive_key_block(master_secret,
190                                              server_random,
191                                              client_random,
192                                              cs.key_block_len)
193
194        # When slicing the key_block, keep the right half of the material
195        skip_first = False
196        if ((self.connection_end == "client" and self.row == "read") or
197                (self.connection_end == "server" and self.row == "write")):
198            skip_first = True
199
200        pos = 0
201        cipher_alg = cs.cipher_alg
202
203        # MAC secret (for block and stream ciphers)
204        if (cipher_alg.type == "stream") or (cipher_alg.type == "block"):
205            start = pos
206            if skip_first:
207                start += cs.hmac_alg.key_len
208            end = start + cs.hmac_alg.key_len
209            mac_secret = key_block[start:end]
210            self.debug_repr("mac_secret", mac_secret)
211            pos += 2 * cs.hmac_alg.key_len
212        else:
213            mac_secret = None
214
215        # Cipher secret
216        start = pos
217        if skip_first:
218            start += cipher_alg.key_len
219        end = start + cipher_alg.key_len
220        cipher_secret = key_block[start:end]
221        if cs.kx_alg.export:
222            reqLen = cipher_alg.expanded_key_len
223            cipher_secret = self.prf.postprocess_key_for_export(cipher_secret,
224                                                                client_random,
225                                                                server_random,
226                                                                self.connection_end,  # noqa: E501
227                                                                self.row,
228                                                                reqLen)
229        self.debug_repr("cipher_secret", cipher_secret)
230        pos += 2 * cipher_alg.key_len
231
232        # Implicit IV (for block and AEAD ciphers)
233        start = pos
234        if cipher_alg.type == "block":
235            if skip_first:
236                start += cipher_alg.block_size
237            end = start + cipher_alg.block_size
238        elif cipher_alg.type == "aead":
239            if skip_first:
240                start += cipher_alg.fixed_iv_len
241            end = start + cipher_alg.fixed_iv_len
242
243        # Now we have the secrets, we can instantiate the algorithms
244        if cs.hmac_alg is None:         # AEAD
245            self.hmac = None
246            self.mac_len = cipher_alg.tag_len
247        else:
248            self.hmac = cs.hmac_alg(mac_secret)
249            self.mac_len = self.hmac.hmac_len
250
251        if cipher_alg.type == "stream":
252            cipher = cipher_alg(cipher_secret)
253        elif cipher_alg.type == "block":
254            # We set an IV every time, even though it does not matter for
255            # TLS 1.1+ as it requires an explicit IV. Indeed the cipher.iv
256            # would get updated in TLS.post_build() or TLS.pre_dissect().
257            iv = key_block[start:end]
258            if cs.kx_alg.export:
259                reqLen = cipher_alg.block_size
260                iv = self.prf.generate_iv_for_export(client_random,
261                                                     server_random,
262                                                     self.connection_end,
263                                                     self.row,
264                                                     reqLen)
265            cipher = cipher_alg(cipher_secret, iv)
266            self.debug_repr("block iv", iv)
267        elif cipher_alg.type == "aead":
268            fixed_iv = key_block[start:end]
269            nonce_explicit_init = 0
270            # If you ever wanted to set a random nonce_explicit, use this:
271            # exp_bit_len = cipher_alg.nonce_explicit_len * 8
272            # nonce_explicit_init = random.randint(0, 2**exp_bit_len - 1)
273            cipher = cipher_alg(cipher_secret, fixed_iv, nonce_explicit_init)
274            self.debug_repr("aead fixed iv", fixed_iv)
275        self.cipher = cipher
276
277    def sslv2_derive_keys(self, key_material):
278        """
279        There is actually only one key, the CLIENT-READ-KEY or -WRITE-KEY.
280
281        Note that skip_first is opposite from the one with SSLv3 derivation.
282
283        Also, if needed, the IV should be set elsewhere.
284        """
285        skip_first = True
286        if ((self.connection_end == "client" and self.row == "read") or
287                (self.connection_end == "server" and self.row == "write")):
288            skip_first = False
289
290        cipher_alg = self.ciphersuite.cipher_alg
291
292        start = 0
293        if skip_first:
294            start += cipher_alg.key_len
295        end = start + cipher_alg.key_len
296        cipher_secret = key_material[start:end]
297        self.cipher = cipher_alg(cipher_secret)
298        self.debug_repr("cipher_secret", cipher_secret)
299
300    def tls13_derive_keys(self, key_material):
301        cipher_alg = self.ciphersuite.cipher_alg
302        key_len = cipher_alg.key_len
303        iv_len = cipher_alg.fixed_iv_len
304        write_key = self.hkdf.expand_label(key_material, b"key", b"", key_len)
305        write_iv = self.hkdf.expand_label(key_material, b"iv", b"", iv_len)
306        self.cipher = cipher_alg(write_key, write_iv)
307
308    def snapshot(self):
309        """
310        This is used mostly as a way to keep the cipher state and the seq_num.
311        """
312        snap = connState(connection_end=self.connection_end,
313                         read_or_write=self.row,
314                         seq_num=self.seq_num,
315                         compression_alg=type(self.compression),
316                         ciphersuite=type(self.ciphersuite),
317                         tls_version=self.tls_version)
318        snap.cipher = self.cipher.snapshot()
319        if self.hmac:
320            snap.hmac.key = self.hmac.key
321        return snap
322
323    def __repr__(self):
324        res = "Connection end : %s\n" % self.connection_end.upper()
325        res += "Cipher suite   : %s (0x%04x)\n" % (self.ciphersuite.name,
326                                                   self.ciphersuite.val)
327        res += "Compression    : %s (0x%02x)\n" % (self.compression.name,
328                                                   self.compression.val)
329        tabsize = 4
330        return res.expandtabs(tabsize)
331
332
333class readConnState(connState):
334    def __init__(self, **kargs):
335        connState.__init__(self, read_or_write="read", **kargs)
336
337
338class writeConnState(connState):
339    def __init__(self, **kargs):
340        connState.__init__(self, read_or_write="write", **kargs)
341
342
343###############################################################################
344#   TLS session                                                               #
345###############################################################################
346
347class tlsSession(object):
348    """
349    This is our TLS context, which gathers information from both sides of the
350    TLS connection. These sides are represented by a readConnState instance and
351    a writeConnState instance. Along with overarching network attributes, a
352    tlsSession object also holds negotiated, shared information, such as the
353    key exchange parameters and the master secret (when available).
354
355    The default connection_end is "server". This corresponds to the expected
356    behaviour for static exchange analysis (with a ClientHello parsed first).
357    """
358
359    def __init__(self,
360                 ipsrc=None, ipdst=None,
361                 sport=None, dport=None, sid=None,
362                 connection_end="server",
363                 wcs=None, rcs=None):
364
365        # Use this switch to prevent additions to the 'handshake_messages'.
366        self.frozen = False
367
368        # Network settings
369        self.ipsrc = ipsrc
370        self.ipdst = ipdst
371        self.sport = sport
372        self.dport = dport
373        self.sid = sid
374
375        # Identify duplicate sessions
376        self.firsttcp = None
377
378        # Our TCP socket. None until we send (or receive) a packet.
379        self.sock = None
380
381        # Connection states
382        self.connection_end = connection_end
383
384        if wcs is None:
385            # Instantiate wcs with dummy values.
386            self.wcs = writeConnState(connection_end=connection_end)
387            self.wcs.derive_keys()
388        else:
389            self.wcs = wcs
390
391        if rcs is None:
392            # Instantiate rcs with dummy values.
393            self.rcs = readConnState(connection_end=connection_end)
394            self.rcs.derive_keys()
395        else:
396            self.rcs = rcs
397
398        # The pending write/read states are updated by the building/parsing
399        # of various TLS packets. They get committed to self.wcs/self.rcs
400        # once Scapy builds/parses a ChangeCipherSpec message, or for certain
401        # other messages in case of TLS 1.3.
402        self.pwcs = None
403        self.triggered_pwcs_commit = False
404        self.prcs = None
405        self.triggered_prcs_commit = False
406
407        # Certificates and private keys
408
409        # The server certificate chain, as a list of Cert instances.
410        # Either we act as server and it has to be provided, or it is expected
411        # to be sent by the server through a Certificate message.
412        # The server certificate should be self.server_certs[0].
413        self.server_certs = []
414
415        # The server private key, as a PrivKey instance, when acting as server.
416        # XXX It would be nice to be able to provide both an RSA and an ECDSA
417        # key in order for the same Scapy server to support both families of
418        # cipher suites. See INIT_TLS_SESSION() in automaton_srv.py.
419        # (For now server_key holds either one of both types for DHE
420        # authentication, while server_rsa_key is used only for RSAkx.)
421        self.server_key = None
422        self.server_rsa_key = None
423        # self.server_ecdsa_key = None
424
425        # A dictionary containing keys extracted from a NSS Keys Log using
426        # the load_nss_keys() function.
427        self.nss_keys = None
428
429        # Back in the dreadful EXPORT days, US servers were forbidden to use
430        # RSA keys longer than 512 bits for RSAkx. When their usual RSA key
431        # was longer than this, they had to create a new key and send it via
432        # a ServerRSAParams message. When receiving such a message,
433        # Scapy stores this key in server_tmp_rsa_key as a PubKey instance.
434        self.server_tmp_rsa_key = None
435
436        # When client authentication is performed, we need at least a
437        # client certificate chain. If we act as client, we also have
438        # to provide the key associated with the first certificate.
439        self.client_certs = []
440        self.client_key = None
441
442        # Ephemeral key exchange parameters
443
444        # The agreed-upon ephemeral key group
445        self.kx_group = None
446
447        # These are the group/curve parameters, needed to hold the information
448        # e.g. from receiving an SKE to sending a CKE. Usually, only one of
449        # these attributes will be different from None.
450        self.client_kx_ffdh_params = None
451        self.client_kx_ecdh_params = None
452
453        # These are PrivateKeys and PublicKeys from the appropriate FFDH/ECDH
454        # cryptography module, i.e. these are not raw bytes. Usually, only one
455        # in two will be different from None, e.g. when being a TLS client you
456        # will need the client_kx_privkey (the serialized public key is not
457        # actually registered) and you will receive a server_kx_pubkey.
458        self.client_kx_privkey = None
459        self.client_kx_pubkey = None
460        self.server_kx_privkey = None
461        self.server_kx_pubkey = None
462
463        # When using TLS 1.3, the tls13_client_pubshares will contain every
464        # potential key share (equate the 'client_kx_pubkey' before) the client
465        # offered, indexed by the id of the FFDH/ECDH group. These dicts
466        # effectively replace the four previous attributes.
467        self.tls13_client_privshares = {}
468        self.tls13_client_pubshares = {}
469        self.tls13_server_privshare = {}
470        self.tls13_server_pubshare = {}
471
472        # Negotiated session parameters
473
474        # The advertised TLS version found in the ClientHello (and
475        # EncryptedPreMasterSecret if used). If acting as server, it is set to
476        # the value advertised by the client in its ClientHello.
477        # The default value corresponds to TLS 1.2 (and TLS 1.3, incidentally).
478        self.advertised_tls_version = 0x0303
479
480        # The agreed-upon TLS version found in the ServerHello.
481        self.tls_version = None
482
483        # These attributes should eventually be known to both sides (SSLv3-TLS 1.2).  # noqa: E501
484        self.client_random = None
485        self.server_random = None
486        self.pre_master_secret = None
487        self.master_secret = None
488
489        # The advertised supported signature algorithms found in the ClientHello
490        # extension. (for TLS 1.2-TLS 1.3 only)
491        self.advertised_sig_algs = []
492        # The agreed-upon signature algorithm (for TLS 1.2-TLS 1.3 only)
493        self.selected_sig_alg = None
494
495        # A session ticket received by the client.
496        self.client_session_ticket = None
497
498        # These attributes should only be used with SSLv2 connections.
499        # We need to keep the KEY-MATERIAL here because it may be reused.
500        self.sslv2_common_cs = []
501        self.sslv2_connection_id = None
502        self.sslv2_challenge = None
503        self.sslv2_challenge_clientcert = None
504        self.sslv2_key_material = None
505
506        # These attributes should only be used with TLS 1.3 connections.
507        self.tls13_psk_secret = None
508        self.tls13_early_secret = None
509        self.tls13_dhe_secret = None
510        self.tls13_handshake_secret = None
511        self.tls13_master_secret = None
512        self.tls13_derived_secrets = {}
513        self.tls13_cert_req_ctxt = False
514        self.post_handshake = False  # whether handshake is done
515        self.post_handshake_auth = False  # whether "Post-Handshake Auth" is used
516        self.tls13_ticket_ciphersuite = None
517        self.tls13_retry = False
518        self.middlebox_compatibility = False
519
520        # Handshake messages needed for Finished computation/validation.
521        # No record layer headers, no HelloRequests, no ChangeCipherSpecs.
522        self.handshake_messages = []
523        self.handshake_messages_parsed = []
524
525        # Post-handshake, handshake messages for post-handshake client authentication
526        self.post_handshake_messages = []
527
528        # Flag, whether we derive the secret as Extended MS or not
529        self.extms = False
530        self.session_hash = None
531
532        self.encrypt_then_mac = False
533
534        # All exchanged TLS packets.
535        # XXX no support for now
536        # self.exchanged_pkts = []
537
538    def __setattr__(self, name, val):
539        if name == "connection_end":
540            if hasattr(self, "rcs") and self.rcs:
541                self.rcs.connection_end = val
542            if hasattr(self, "wcs") and self.wcs:
543                self.wcs.connection_end = val
544            if hasattr(self, "prcs") and self.prcs:
545                self.prcs.connection_end = val
546            if hasattr(self, "pwcs") and self.pwcs:
547                self.pwcs.connection_end = val
548        super(tlsSession, self).__setattr__(name, val)
549
550    # Get infos from underlayer
551
552    def set_underlayer(self, _underlayer):
553        if isinstance(_underlayer, TCP):
554            tcp = _underlayer
555            self.sport = tcp.sport
556            self.dport = tcp.dport
557            try:
558                self.ipsrc = tcp.underlayer.src
559                self.ipdst = tcp.underlayer.dst
560            except AttributeError:
561                pass
562            if self.firsttcp is None:
563                self.firsttcp = tcp.seq
564
565    # Mirroring
566
567    def mirror(self):
568        """
569        This function takes a tlsSession object and swaps the IP addresses,
570        ports, connection ends and connection states. The triggered_commit are
571        also swapped (though it is probably overkill, it is cleaner this way).
572
573        It is useful for static analysis of a series of messages from both the
574        client and the server. In such a situation, it should be used every
575        time the message being read comes from a different side than the one
576        read right before, as the reading state becomes the writing state, and
577        vice versa. For instance you could do::
578
579            client_hello = open('client_hello.raw').read()
580            <read other messages>
581
582            m1 = TLS(client_hello)
583            m2 = TLS(server_hello, tls_session=m1.tls_session.mirror())
584            m3 = TLS(server_cert, tls_session=m2.tls_session)
585            m4 = TLS(client_keyexchange, tls_session=m3.tls_session.mirror())
586        """
587
588        self.ipdst, self.ipsrc = self.ipsrc, self.ipdst
589        self.dport, self.sport = self.sport, self.dport
590
591        self.rcs, self.wcs = self.wcs, self.rcs
592        if self.rcs:
593            self.rcs.row = "read"
594        if self.wcs:
595            self.wcs.row = "write"
596
597        self.prcs, self.pwcs = self.pwcs, self.prcs
598        if self.prcs:
599            self.prcs.row = "read"
600        if self.pwcs:
601            self.pwcs.row = "write"
602
603        self.triggered_prcs_commit, self.triggered_pwcs_commit = \
604            self.triggered_pwcs_commit, self.triggered_prcs_commit
605
606        if self.connection_end == "client":
607            self.connection_end = "server"
608        elif self.connection_end == "server":
609            self.connection_end = "client"
610
611        return self
612
613    # Secrets management for SSLv3 to TLS 1.2
614
615    def compute_master_secret(self):
616        if self.pre_master_secret is None:
617            warning("Missing pre_master_secret while computing master_secret!")
618        if self.client_random is None:
619            warning("Missing client_random while computing master_secret!")
620        if self.server_random is None:
621            warning("Missing server_random while computing master_secret!")
622        if self.extms and self.session_hash is None:
623            warning("Missing session hash while computing master secret!")
624
625        ms = self.pwcs.prf.compute_master_secret(self.pre_master_secret,
626                                                 self.client_random,
627                                                 self.server_random,
628                                                 self.extms,
629                                                 self.session_hash)
630        self.master_secret = ms
631        if conf.debug_tls:
632            log_runtime.debug("TLS: master secret: %s", repr_hex(ms))
633
634    def use_nss_master_secret_if_present(self) -> bool:
635        # Load the master secret from an NSS Key dictionary
636        if not self.nss_keys or "CLIENT_RANDOM" not in self.nss_keys:
637            return False
638        if self.client_random in self.nss_keys["CLIENT_RANDOM"]:
639            self.master_secret = self.nss_keys["CLIENT_RANDOM"][self.client_random]
640            return True
641        return False
642
643    def compute_ms_and_derive_keys(self):
644        if not self.master_secret:
645            self.compute_master_secret()
646
647        self.prcs.derive_keys(client_random=self.client_random,
648                              server_random=self.server_random,
649                              master_secret=self.master_secret)
650        self.pwcs.derive_keys(client_random=self.client_random,
651                              server_random=self.server_random,
652                              master_secret=self.master_secret)
653
654    # Secrets management for SSLv2
655
656    def compute_sslv2_key_material(self):
657        if self.master_secret is None:
658            warning("Missing master_secret while computing key_material!")
659        if self.sslv2_challenge is None:
660            warning("Missing challenge while computing key_material!")
661        if self.sslv2_connection_id is None:
662            warning("Missing connection_id while computing key_material!")
663
664        km = self.pwcs.prf.derive_key_block(self.master_secret,
665                                            self.sslv2_challenge,
666                                            self.sslv2_connection_id,
667                                            2 * self.pwcs.cipher.key_len)
668        self.sslv2_key_material = km
669        if conf.debug_tls:
670            log_runtime.debug("TLS: master secret: %s", repr_hex(self.master_secret))  # noqa: E501
671            log_runtime.debug("TLS: key material: %s", repr_hex(km))
672
673    def compute_sslv2_km_and_derive_keys(self):
674        self.compute_sslv2_key_material()
675        self.prcs.sslv2_derive_keys(key_material=self.sslv2_key_material)
676        self.pwcs.sslv2_derive_keys(key_material=self.sslv2_key_material)
677
678    # Secrets management for TLS 1.3
679
680    def compute_tls13_early_secrets(self, external=False):
681        """
682        This function computes the Early Secret, the binder_key,
683        the client_early_traffic_secret and the
684        early_exporter_master_secret (See RFC8446, section 7.1).
685
686        The parameter external is used for the computation of the
687        binder_key:
688
689        - For external PSK provisioned outside out of TLS, the parameter
690          external must be True.
691        - For resumption PSK, the parameter external must be False.
692
693        If no argument is specified, the label "res binder" will be
694        used by default.
695
696        Ciphers key and IV are updated accordingly for 0-RTT data.
697        self.handshake_messages should be ClientHello only.
698        """
699
700        # if no hash algorithm is set, default to SHA-256
701        if self.prcs and self.prcs.hkdf:
702            hkdf = self.prcs.hkdf
703        elif self.pwcs and self.pwcs.hkdf:
704            hkdf = self.pwcs.hkdf
705        else:
706            hkdf = TLS13_HKDF("sha256")
707
708        if self.tls13_early_secret is None:
709            self.tls13_early_secret = hkdf.extract(None,
710                                                   self.tls13_psk_secret)
711
712        if "binder_key" not in self.tls13_derived_secrets:
713            if external:
714                bk = hkdf.derive_secret(self.tls13_early_secret,
715                                        b"ext binder",
716                                        b"")
717            else:
718                bk = hkdf.derive_secret(self.tls13_early_secret,
719                                        b"res binder",
720                                        b"")
721
722            self.tls13_derived_secrets["binder_key"] = bk
723
724        cets = hkdf.derive_secret(self.tls13_early_secret,
725                                  b"c e traffic",
726                                  b"".join(self.handshake_messages))
727
728        self.tls13_derived_secrets["client_early_traffic_secret"] = cets
729        ees = hkdf.derive_secret(self.tls13_early_secret,
730                                 b"e exp master",
731                                 b"".join(self.handshake_messages))
732        self.tls13_derived_secrets["early_exporter_secret"] = ees
733
734        if self.connection_end == "server":
735            if self.prcs:
736                self.prcs.tls13_derive_keys(cets)
737        elif self.connection_end == "client":
738            if self.pwcs:
739                self.pwcs.tls13_derive_keys(cets)
740
741    def compute_tls13_handshake_secrets(self):
742        """
743        Ciphers key and IV are updated accordingly for Handshake data.
744        self.handshake_messages should be ClientHello...ServerHello.
745        """
746        if self.prcs:
747            hkdf = self.prcs.hkdf
748        elif self.pwcs:
749            hkdf = self.pwcs.hkdf
750        else:
751            warning("No HKDF. This is abnormal.")
752            return
753
754        if self.tls13_early_secret is None:
755            self.tls13_early_secret = hkdf.extract(None,
756                                                   self.tls13_psk_secret)
757
758        secret = hkdf.derive_secret(self.tls13_early_secret, b"derived", b"")
759        self.tls13_handshake_secret = hkdf.extract(secret, self.tls13_dhe_secret)  # noqa: E501
760
761        chts = hkdf.derive_secret(self.tls13_handshake_secret,
762                                  b"c hs traffic",
763                                  b"".join(self.handshake_messages))
764        self.tls13_derived_secrets["client_handshake_traffic_secret"] = chts
765
766        shts = hkdf.derive_secret(self.tls13_handshake_secret,
767                                  b"s hs traffic",
768                                  b"".join(self.handshake_messages))
769        self.tls13_derived_secrets["server_handshake_traffic_secret"] = shts
770
771    def compute_tls13_traffic_secrets(self):
772        """
773        Ciphers key and IV are updated accordingly for Application data.
774        self.handshake_messages should be ClientHello...ServerFinished.
775        """
776        if self.prcs and self.prcs.hkdf:
777            hkdf = self.prcs.hkdf
778        elif self.pwcs and self.pwcs.hkdf:
779            hkdf = self.pwcs.hkdf
780        else:
781            warning("No HKDF. This is abnormal.")
782            return
783
784        tmp = hkdf.derive_secret(self.tls13_handshake_secret,
785                                 b"derived",
786                                 b"")
787        self.tls13_master_secret = hkdf.extract(tmp, None)
788
789        cts0 = hkdf.derive_secret(self.tls13_master_secret,
790                                  b"c ap traffic",
791                                  b"".join(self.handshake_messages))
792        self.tls13_derived_secrets["client_traffic_secrets"] = [cts0]
793
794        sts0 = hkdf.derive_secret(self.tls13_master_secret,
795                                  b"s ap traffic",
796                                  b"".join(self.handshake_messages))
797        self.tls13_derived_secrets["server_traffic_secrets"] = [sts0]
798
799        es = hkdf.derive_secret(self.tls13_master_secret,
800                                b"exp master",
801                                b"".join(self.handshake_messages))
802        self.tls13_derived_secrets["exporter_secret"] = es
803
804        if self.connection_end == "server":
805            # self.prcs.tls13_derive_keys(cts0)
806            self.pwcs.tls13_derive_keys(sts0)
807        elif self.connection_end == "client":
808            # self.pwcs.tls13_derive_keys(cts0)
809            self.prcs.tls13_derive_keys(sts0)
810
811    def compute_tls13_traffic_secrets_end(self):
812        cts0 = self.tls13_derived_secrets["client_traffic_secrets"][0]
813        if self.connection_end == "server":
814            self.prcs.tls13_derive_keys(cts0)
815        elif self.connection_end == "client":
816            self.pwcs.tls13_derive_keys(cts0)
817
818    def compute_tls13_verify_data(self, connection_end, read_or_write,
819                                  handshake_context):
820        # RFC8446 - 4.4
821        # +-----------+-------------------------+-----------------------------+
822        # | Mode      | Handshake Context       | Base Key                    |
823        # +-----------+-------------------------+-----------------------------+
824        # | Server    | ClientHello ... later   | server_handshake_traffic_   |
825        # |           | of EncryptedExtensions/ | secret                      |
826        # |           | CertificateRequest      |                             |
827        # |           |                         |                             |
828        # | Client    | ClientHello ... later   | client_handshake_traffic_   |
829        # |           | of server               | secret                      |
830        # |           | Finished/EndOfEarlyData |                             |
831        # |           |                         |                             |
832        # | Post-     | ClientHello ... client  | client_application_traffic_ |
833        # | Handshake | Finished +              | secret_N                    |
834        # |           | CertificateRequest      |                             |
835        # +-----------+-------------------------+-----------------------------+
836        if self.post_handshake:
837            # RFC8446 - 4.6
838            # TLS also allows other messages to be sent after the main handshake.
839            # These messages use a handshake content type and are encrypted under
840            # the appropriate application traffic key.
841            shts = self.tls13_derived_secrets["server_traffic_secrets"][-1]
842            chts = self.tls13_derived_secrets["client_traffic_secrets"][-1]
843        else:
844            shts = self.tls13_derived_secrets["server_handshake_traffic_secret"]
845            chts = self.tls13_derived_secrets["client_handshake_traffic_secret"]
846        if read_or_write == "read":
847            hkdf = self.rcs.hkdf
848            if connection_end == "client":
849                basekey = shts
850            elif connection_end == "server":
851                basekey = chts
852        elif read_or_write == "write":
853            hkdf = self.wcs.hkdf
854            if connection_end == "client":
855                basekey = chts
856            elif connection_end == "server":
857                basekey = shts
858
859        if not hkdf or not basekey:
860            warning("Missing arguments for verify_data computation!")
861            return None
862        return hkdf.compute_verify_data(basekey, handshake_context)
863
864    def compute_tls13_resumption_secret(self):
865        """
866        self.handshake_messages should be ClientHello...ClientFinished.
867        """
868        if self.connection_end == "server":
869            hkdf = self.prcs.hkdf
870        elif self.connection_end == "client":
871            hkdf = self.pwcs.hkdf
872        rs = hkdf.derive_secret(self.tls13_master_secret,
873                                b"res master",
874                                b"".join(self.handshake_messages))
875        self.tls13_derived_secrets["resumption_secret"] = rs
876
877    def compute_tls13_next_traffic_secrets(self, connection_end, read_or_write):  # noqa : E501
878        """
879        Ciphers key and IV are updated accordingly.
880        """
881        if self.rcs.hkdf:
882            hkdf = self.rcs.hkdf
883            hl = hkdf.hash.digest_size
884        elif self.wcs.hkdf:
885            hkdf = self.wcs.hkdf
886            hl = hkdf.hash.digest_size
887
888        if read_or_write == "read":
889            if connection_end == "client":
890                cts = self.tls13_derived_secrets["client_traffic_secrets"]
891                ctsN = cts[-1]
892                ctsN_1 = hkdf.expand_label(ctsN, b"traffic upd", b"", hl)
893                cts.append(ctsN_1)
894                self.prcs.tls13_derive_keys(ctsN_1)
895            elif connection_end == "server":
896                sts = self.tls13_derived_secrets["server_traffic_secrets"]
897                stsN = sts[-1]
898                stsN_1 = hkdf.expand_label(stsN, b"traffic upd", b"", hl)
899                sts.append(stsN_1)
900
901                self.prcs.tls13_derive_keys(stsN_1)
902
903        elif read_or_write == "write":
904            if connection_end == "client":
905                cts = self.tls13_derived_secrets["client_traffic_secrets"]
906                ctsN = cts[-1]
907                ctsN_1 = hkdf.expand_label(ctsN, b"traffic upd", b"", hl)
908                cts.append(ctsN_1)
909                self.pwcs.tls13_derive_keys(ctsN_1)
910            elif connection_end == "server":
911                sts = self.tls13_derived_secrets["server_traffic_secrets"]
912                stsN = sts[-1]
913                stsN_1 = hkdf.expand_label(stsN, b"traffic upd", b"", hl)
914                sts.append(stsN_1)
915
916                self.pwcs.tls13_derive_keys(stsN_1)
917
918    # Tests for record building/parsing
919
920    def consider_read_padding(self):
921        # Return True if padding is needed. Used by TLSPadField.
922        return (self.rcs.cipher.type == "block" and
923                not (False in self.rcs.cipher.ready.values()))
924
925    def consider_write_padding(self):
926        # Return True if padding is needed. Used by TLSPadField.
927        return self.wcs.cipher.type == "block"
928
929    def use_explicit_iv(self, version, cipher_type):
930        # Return True if an explicit IV is needed. Required for TLS 1.1+
931        # when either a block or an AEAD cipher is used.
932        if cipher_type == "stream":
933            return False
934        return version >= 0x0302
935
936    # Python object management
937
938    def hash(self):
939        s1 = struct.pack("!H", self.sport)
940        s2 = struct.pack("!H", self.dport)
941        family = socket.AF_INET
942        if ':' in self.ipsrc:
943            family = socket.AF_INET6
944        s1 += inet_pton(family, self.ipsrc)
945        s2 += inet_pton(family, self.ipdst)
946        return strxor(s1, s2)
947
948    def eq(self, other):
949        ok = False
950        if (self.sport == other.sport and self.dport == other.dport and
951                self.ipsrc == other.ipsrc and self.ipdst == other.ipdst):
952            ok = True
953
954        if (not ok and
955            self.dport == other.sport and self.sport == other.dport and
956                self.ipdst == other.ipsrc and self.ipsrc == other.ipdst):
957            ok = True
958
959        if ok:
960            if self.sid and other.sid:
961                return self.sid == other.sid
962            return True
963
964        return False
965
966    def repr(self, _underlayer=None):
967        sid = repr(self.sid)
968        if len(sid) > 12:
969            sid = sid[:11] + "..."
970        if _underlayer and _underlayer.dport != self.dport:
971            return "%s:%s > %s:%s" % (self.ipdst, str(self.dport),
972                                      self.ipsrc, str(self.sport))
973        return "%s:%s > %s:%s" % (self.ipsrc, str(self.sport),
974                                  self.ipdst, str(self.dport))
975
976    def __repr__(self):
977        return self.repr()
978
979
980###############################################################################
981#   Session singleton                                                         #
982###############################################################################
983
984
985class _GenericTLSSessionInheritance(Packet):
986    """
987    Many classes inside the TLS module need to get access to session-related
988    information. For instance, an encrypted TLS record cannot be parsed without
989    some knowledge of the cipher suite being used and the secrets which have
990    been negotiated. Passing information is also essential to the handshake.
991    To this end, various TLS objects inherit from the present class.
992    """
993    __slots__ = ["tls_session", "rcs_snap_init", "wcs_snap_init"]
994    name = "Dummy Generic TLS Packet"
995    fields_desc = []
996
997    def __init__(self, _pkt="", post_transform=None, _internal=0,
998                 _underlayer=None, tls_session=None, **fields):
999        try:
1000            setme = self.tls_session is None
1001        except Exception:
1002            setme = True
1003
1004        newses = False
1005        if setme:
1006            if tls_session is None:
1007                newses = True
1008                self.tls_session = tlsSession()
1009            else:
1010                self.tls_session = tls_session
1011
1012        self.rcs_snap_init = self.tls_session.rcs.snapshot()
1013        self.wcs_snap_init = self.tls_session.wcs.snapshot()
1014
1015        if isinstance(_underlayer, TCP):
1016            # Get information from _underlayer
1017            self.tls_session.set_underlayer(_underlayer)
1018
1019            # Load a NSS Key Log file
1020            if conf.tls_nss_filename is not None:
1021                if conf.tls_nss_keys is None:
1022                    conf.tls_nss_keys = load_nss_keys(conf.tls_nss_filename)
1023
1024            if conf.tls_session_enable:
1025                if newses:
1026                    s = conf.tls_sessions.find(self.tls_session)
1027                    if s:
1028                        if conf.tls_nss_keys is not None:
1029                            s.nss_keys = conf.tls_nss_keys
1030                        if s.dport == self.tls_session.dport:
1031                            self.tls_session = s
1032                        else:
1033                            self.tls_session = s.mirror()
1034                    else:
1035                        if conf.tls_nss_keys is not None:
1036                            self.tls_session.nss_keys = conf.tls_nss_keys
1037                        conf.tls_sessions.add(self.tls_session)
1038            if self.tls_session.connection_end == "server":
1039                srk = conf.tls_sessions.server_rsa_key
1040                if not self.tls_session.server_rsa_key and \
1041                        srk:
1042                    self.tls_session.server_rsa_key = srk
1043
1044        Packet.__init__(self, _pkt=_pkt, post_transform=post_transform,
1045                        _internal=_internal, _underlayer=_underlayer,
1046                        **fields)
1047
1048    def __getattr__(self, attr):
1049        """
1050        The tls_session should be found only through the normal mechanism.
1051        """
1052        if attr == "tls_session":
1053            return None
1054        return super(_GenericTLSSessionInheritance, self).__getattr__(attr)
1055
1056    def tls_session_update(self, msg_str):
1057        """
1058        post_{build, dissection}_tls_session_update() are used to update the
1059        tlsSession context. The default definitions below, along with
1060        tls_session_update(), may prevent code duplication in some cases.
1061        """
1062        pass
1063
1064    def post_build_tls_session_update(self, msg_str):
1065        self.tls_session_update(msg_str)
1066
1067    def post_dissection_tls_session_update(self, msg_str):
1068        self.tls_session_update(msg_str)
1069
1070    def copy(self):
1071        pkt = Packet.copy(self)
1072        pkt.tls_session = self.tls_session
1073        return pkt
1074
1075    def clone_with(self, payload=None, **kargs):
1076        pkt = Packet.clone_with(self, payload=payload, **kargs)
1077        pkt.tls_session = self.tls_session
1078        return pkt
1079
1080    def raw_stateful(self):
1081        return super(_GenericTLSSessionInheritance, self).__bytes__()
1082
1083    def str_stateful(self):
1084        return self.raw_stateful()
1085
1086    def __bytes__(self):
1087        """
1088        The __bytes__ call has to leave the connection states unchanged.
1089        We also have to delete raw_packet_cache in order to access post_build.
1090
1091        For performance, the pending connStates are not snapshotted.
1092        This should not be an issue, but maybe pay attention to this.
1093
1094        The previous_freeze_state prevents issues with calling a raw() calling
1095        in turn another raw(), which would unfreeze the session too soon.
1096        """
1097        s = self.tls_session
1098        rcs_snap = s.rcs.snapshot()
1099        wcs_snap = s.wcs.snapshot()
1100        rpc_snap = self.raw_packet_cache
1101        rpcf_snap = self.raw_packet_cache_fields
1102
1103        s.wcs = self.rcs_snap_init
1104
1105        self.raw_packet_cache = None
1106        previous_freeze_state = s.frozen
1107        s.frozen = True
1108        built_packet = super(_GenericTLSSessionInheritance, self).__bytes__()
1109        s.frozen = previous_freeze_state
1110
1111        s.rcs = rcs_snap
1112        s.wcs = wcs_snap
1113        self.raw_packet_cache = rpc_snap
1114        self.raw_packet_cache_fields = rpcf_snap
1115
1116        return built_packet
1117    __str__ = __bytes__
1118
1119    def show2(self):
1120        """
1121        Rebuild the TLS packet with the same context, and then .show() it.
1122        We need self.__class__ to call the subclass in a dynamic way.
1123
1124        Howether we do not want the tls_session.{r,w}cs.seq_num to be updated.
1125        We have to bring back the init states (it's possible the cipher context
1126        has been updated because of parsing) but also to keep the current state
1127        and restore it afterwards (the raw() call may also update the states).
1128        """
1129        s = self.tls_session
1130        rcs_snap = s.rcs.snapshot()
1131        wcs_snap = s.wcs.snapshot()
1132
1133        s.rcs = self.rcs_snap_init
1134
1135        built_packet = raw(self)
1136        s.frozen = True
1137        self.__class__(built_packet, tls_session=s).show()
1138        s.frozen = False
1139
1140        s.rcs = rcs_snap
1141        s.wcs = wcs_snap
1142
1143    def mysummary(self, first=True):
1144        from scapy.layers.tls.record import TLS
1145        from scapy.layers.tls.record_tls13 import TLS13
1146        if (
1147            self.underlayer and
1148            isinstance(self.underlayer, _GenericTLSSessionInheritance)
1149        ):
1150            summary = getattr(self, "_name", self.name)
1151        else:
1152            _underlayer = None
1153            if self.underlayer and isinstance(self.underlayer, TCP):
1154                _underlayer = self.underlayer
1155            summary = "TLS %s / %s" % (
1156                self.tls_session.repr(_underlayer=_underlayer),
1157                getattr(self, "_name", self.name)
1158            )
1159        return summary, [TLS, TLS13]
1160
1161    @classmethod
1162    def tcp_reassemble(cls, data, metadata, session):
1163        # Used with TCPSession
1164        from scapy.layers.tls.record import TLS
1165        from scapy.layers.tls.record_tls13 import TLS13
1166        if cls in (TLS, TLS13):
1167            length = struct.unpack("!H", data[3:5])[0] + 5
1168            if len(data) >= length:
1169                # get the underlayer as it is used to populate tls_session
1170                if "original" not in metadata:
1171                    return cls(data)
1172                underlayer = metadata["original"][TCP].copy()
1173                underlayer.remove_payload()
1174                # eventually get the tls_session now for TLS.dispatch_hook
1175                tls_session = None
1176                if conf.tls_session_enable:
1177                    s = tlsSession()
1178                    s.set_underlayer(underlayer)
1179                    tls_session = conf.tls_sessions.find(s)
1180                    if tls_session:
1181                        if tls_session.dport != underlayer.dport:
1182                            tls_session = tls_session.mirror()
1183                        if tls_session.firsttcp == underlayer.seq:
1184                            log_runtime.info(
1185                                "TLS: session %s is a duplicate of a previous "
1186                                "dissection. Discard it" % repr(tls_session)
1187                            )
1188                            conf.tls_sessions.rem(tls_session, force=True)
1189                            tls_session = None
1190                return cls(data, _underlayer=underlayer, tls_session=tls_session)
1191        else:
1192            return cls(data)
1193
1194
1195###############################################################################
1196#   Multiple TLS sessions                                                     #
1197###############################################################################
1198
1199class _tls_sessions(object):
1200    def __init__(self):
1201        self.sessions = {}
1202        self.server_rsa_key = None
1203
1204    def add(self, session):
1205        s = self.find(session)
1206        if s:
1207            log_runtime.info("TLS: previous session shall not be overwritten")
1208            return
1209
1210        h = session.hash()
1211        if h in self.sessions:
1212            self.sessions[h].append(session)
1213        else:
1214            self.sessions[h] = [session]
1215
1216    def rem(self, session, force=False):
1217        if not force:
1218            s = self.find(session)
1219            if s:
1220                log_runtime.info("TLS: previous session shall not be overwritten")
1221                return
1222
1223        h = session.hash()
1224        self.sessions[h].remove(session)
1225
1226    def find(self, session):
1227        try:
1228            h = session.hash()
1229        except Exception:
1230            return None
1231        if h in self.sessions:
1232            for k in self.sessions[h]:
1233                if k.eq(session):
1234                    if conf.debug_tls:
1235                        log_runtime.info("TLS: found session matching %s", k)
1236                    return k
1237        if conf.debug_tls:
1238            log_runtime.info("TLS: did not find session matching %s", session)
1239        return None
1240
1241    def __repr__(self):
1242        res = [("First endpoint", "Second endpoint", "Session ID")]
1243        for li in self.sessions.values():
1244            for s in li:
1245                src = "%s[%d]" % (s.ipsrc, s.sport)
1246                dst = "%s[%d]" % (s.ipdst, s.dport)
1247                sid = repr(s.sid)
1248                if len(sid) > 12:
1249                    sid = sid[:11] + "..."
1250                res.append((src, dst, sid))
1251        colwidth = (max(len(y) for y in x) for x in zip(*res))
1252        fmt = "  ".join(map(lambda x: "%%-%ds" % x, colwidth))
1253        return "\n".join(map(lambda x: fmt % x, res))
1254
1255
1256class TLSSession(TCPSession):
1257    def __init__(self, *args, **kwargs):
1258        # XXX this doesn't bring any value.
1259        warning(
1260            "TLSSession is deprecated and will be removed in a future version. "
1261            "Please use TCPSession instead with conf.tls_session_enable=True"
1262        )
1263        server_rsa_key = kwargs.pop("server_rsa_key", None)
1264        super(TLSSession, self).__init__(*args, **kwargs)
1265        self._old_conf_status = conf.tls_session_enable
1266        conf.tls_session_enable = True
1267        if server_rsa_key:
1268            conf.tls_sessions.server_rsa_key = server_rsa_key
1269
1270    def toPacketList(self):
1271        conf.tls_session_enable = self._old_conf_status
1272        return super(TLSSession, self).toPacketList()
1273
1274
1275# Instantiate the TLS sessions holder
1276conf.tls_sessions = _tls_sessions()
1277